Reindex: properly mark things as child tasks

Do this by creating a Client subclass that automatically assigns the
parentTask to all requests that come through it. Code that doesn't want
to set the parentTask can call `unwrap` on the Client to get the inner
client instance that doesn't set the parentTask. Reindex uses this for
its ClearScrollRequest so that the request will run properly after the
reindex request has been canceled.
This commit is contained in:
Nik Everett 2016-04-14 18:22:24 -04:00
parent 9da88a99da
commit cc1a55423c
10 changed files with 199 additions and 50 deletions

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
/**
* A {@linkplain Client} that sets the parent task on all requests that it makes. Use this to conveniently implement actions that cause
* many other actions.
*/
public class ParentTaskAssigningClient extends FilterClient {
private final TaskId parentTask;
/**
* Standard constructor.
*/
public ParentTaskAssigningClient(Client in, TaskId parentTask) {
super(in);
this.parentTask = parentTask;
}
/**
* Convenience constructor for building the TaskId out of what is usually at hand.
*/
public ParentTaskAssigningClient(Client in, DiscoveryNode localNode, Task parentTask) {
this(in, new TaskId(localNode.getId(), parentTask.getId()));
}
/**
* Fetch the wrapped client. Use this to make calls that don't set {@link ActionRequest#setParentTask(TaskId)}.
*/
public Client unwrap() {
return in();
}
@Override
protected < Request extends ActionRequest<Request>,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>
> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
request.setParentTask(parentTask);
super.doExecute(action, request, listener);
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
public class ParentTaskAssigningClientTests extends ESTestCase {
public void testSetsParentId() {
TaskId[] parentTaskId = new TaskId[] {new TaskId(randomAsciiOfLength(3), randomLong())};
// This mock will do nothing but verify that parentTaskId is set on all requests sent to it.
NoOpClient mock = new NoOpClient(getTestName()) {
@Override
protected < Request extends ActionRequest<Request>,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>
> void doExecute( Action<Request, Response, RequestBuilder> action, Request request,
ActionListener<Response> listener) {
assertEquals(parentTaskId[0], request.getParentTask());
super.doExecute(action, request, listener);
}
};
try (ParentTaskAssigningClient client = new ParentTaskAssigningClient(mock, parentTaskId[0])) {
// All of these should have the parentTaskId set
client.bulk(new BulkRequest());
client.search(new SearchRequest());
client.clearScroll(new ClearScrollRequest());
// Now lets verify that unwrapped calls don't have the parentTaskId set
parentTaskId[0] = TaskId.EMPTY_TASK_ID;
client.unwrap().bulk(new BulkRequest());
client.unwrap().search(new SearchRequest());
client.unwrap().clearScroll(new ClearScrollRequest());
}
}
}

View File

@ -35,7 +35,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -87,14 +87,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final Set<String> destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<String> destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ESLogger logger; private final ESLogger logger;
private final Client client; private final ParentTaskAssigningClient client;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final SearchRequest firstSearchRequest; private final SearchRequest firstSearchRequest;
private final ActionListener<Response> listener; private final ActionListener<Response> listener;
private final Retry retry; private final Retry retry;
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, Client client, ThreadPool threadPool, public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client,
Request mainRequest, SearchRequest firstSearchRequest, ActionListener<Response> listener) { ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ActionListener<Response> listener) {
this.task = task; this.task = task;
this.logger = logger; this.logger = logger;
this.client = client; this.client = client;
@ -409,7 +409,11 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
*/ */
ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId); clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, new ActionListener<ClearScrollResponse>() { /*
* Unwrap the client so we don't set our task as the parent. If we *did* set our ID then the clear scroll would be cancelled as
* if this task is cancelled. But we want to clear the scroll regardless of whether or not the main request was cancelled.
*/
client.unwrap().clearScroll(clearScrollRequest, new ActionListener<ClearScrollResponse>() {
@Override @Override
public void onResponse(ClearScrollResponse response) { public void onResponse(ClearScrollResponse response) {
logger.debug("Freed [{}] contexts", response.getNumFreed()); logger.debug("Freed [{}] contexts", response.getNumFreed());

View File

@ -23,9 +23,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.IndexFieldMapper; import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
@ -62,8 +61,8 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<
private final ScriptService scriptService; private final ScriptService scriptService;
private final CompiledScript script; private final CompiledScript script;
public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, ClusterState state,
ClusterState state, Client client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest,
ActionListener<Response> listener) { ActionListener<Response> listener) {
super(task, logger, client, threadPool, mainRequest, firstSearchRequest, listener); super(task, logger, client, threadPool, mainRequest, firstSearchRequest, listener);
this.scriptService = scriptService; this.scriptService = scriptService;

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
@ -73,8 +74,8 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
protected void doExecute(Task task, ReindexRequest request, ActionListener<ReindexResponse> listener) { protected void doExecute(Task task, ReindexRequest request, ActionListener<ReindexResponse> listener) {
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex, state); validateAgainstAliases(request.getSearchRequest(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex, state);
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, state, threadPool, request, listener) ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
.start(); new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, state, threadPool, request, listener).start();
} }
@Override @Override
@ -116,8 +117,9 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
* possible. * possible.
*/ */
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> { static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> {
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService,
ClusterState state, ThreadPool threadPool, ReindexRequest request, ActionListener<ReindexResponse> listener) { ParentTaskAssigningClient client, ClusterState state, ThreadPool threadPool, ReindexRequest request,
ActionListener<ReindexResponse> listener) {
super(task, logger, scriptService, state, client, threadPool, request, request.getSearchRequest(), listener); super(task, logger, scriptService, state, client, threadPool, request, request.getSearchRequest(), listener);
} }

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
@ -66,9 +67,9 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
} }
@Override @Override
protected void doExecute(Task task, UpdateByQueryRequest request, protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
ActionListener<BulkIndexByScrollResponse> listener) {
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, state, request, listener) new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, state, request, listener)
.start(); .start();
} }
@ -82,8 +83,8 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
* Simple implementation of update-by-query using scrolling and bulk. * Simple implementation of update-by-query using scrolling and bulk.
*/ */
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> { static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService,
ThreadPool threadPool, ClusterState clusterState, UpdateByQueryRequest request, ParentTaskAssigningClient client, ThreadPool threadPool, ClusterState clusterState, UpdateByQueryRequest request,
ActionListener<BulkIndexByScrollResponse> listener) { ActionListener<BulkIndexByScrollResponse> listener) {
super(task, logger, scriptService, clusterState, client, threadPool, request, request.getSearchRequest(), listener); super(task, logger, scriptService, clusterState, client, threadPool, request, request.getSearchRequest(), listener);
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
@ -47,8 +48,11 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient; import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@ -60,6 +64,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.test.client.NoOpClient;
@ -83,6 +88,7 @@ import java.util.function.Consumer;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff; import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
@ -102,30 +108,34 @@ import static org.hamcrest.Matchers.instanceOf;
public class AsyncBulkByScrollActionTests extends ESTestCase { public class AsyncBulkByScrollActionTests extends ESTestCase {
private MyMockClient client; private MyMockClient client;
private ThreadPool threadPool; private ThreadPool threadPool;
private DummyAbstractBulkByScrollRequest mainRequest; private DummyAbstractBulkByScrollRequest testRequest;
private SearchRequest firstSearchRequest; private SearchRequest firstSearchRequest;
private PlainActionFuture<BulkIndexByScrollResponse> listener; private PlainActionFuture<BulkIndexByScrollResponse> listener;
private String scrollId; private String scrollId;
private TaskManager taskManager; private TaskManager taskManager;
private BulkByScrollTask task; private BulkByScrollTask testTask;
private Map<String, String> expectedHeaders = new HashMap<>(); private Map<String, String> expectedHeaders = new HashMap<>();
private DiscoveryNode localNode;
private TaskId taskId;
@Before @Before
public void setupForTest() { public void setupForTest() {
client = new MyMockClient(new NoOpClient(getTestName())); client = new MyMockClient(new NoOpClient(getTestName()));
threadPool = new ThreadPool(getTestName()); threadPool = new ThreadPool(getTestName());
mainRequest = new DummyAbstractBulkByScrollRequest(); testRequest = new DummyAbstractBulkByScrollRequest();
firstSearchRequest = new SearchRequest().scroll(timeValueSeconds(10)); firstSearchRequest = new SearchRequest().scroll(timeValueSeconds(10));
listener = new PlainActionFuture<>(); listener = new PlainActionFuture<>();
scrollId = null; scrollId = null;
taskManager = new TaskManager(Settings.EMPTY); taskManager = new TaskManager(Settings.EMPTY);
task = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", mainRequest); testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
// Fill the context with something random so we can make sure we inherited it appropriately. // Fill the context with something random so we can make sure we inherited it appropriately.
expectedHeaders.clear(); expectedHeaders.clear();
expectedHeaders.put(randomSimpleString(random()), randomSimpleString(random())); expectedHeaders.put(randomSimpleString(random()), randomSimpleString(random()));
threadPool.getThreadContext().newStoredContext(); threadPool.getThreadContext().newStoredContext();
threadPool.getThreadContext().putHeader(expectedHeaders); threadPool.getThreadContext().putHeader(expectedHeaders);
localNode = new DiscoveryNode("thenode", new LocalTransportAddress("dead.end:666"), emptyMap(), emptySet(), Version.CURRENT);
taskId = new TaskId(localNode.getId(), testTask.getId());
} }
@After @After
@ -146,14 +156,14 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void testScrollResponseSetsTotal() { public void testScrollResponseSetsTotal() {
// Default is 0, meaning unstarted // Default is 0, meaning unstarted
assertEquals(0, task.getStatus().getTotal()); assertEquals(0, testTask.getStatus().getTotal());
long total = randomIntBetween(0, Integer.MAX_VALUE); long total = randomIntBetween(0, Integer.MAX_VALUE);
InternalSearchHits hits = new InternalSearchHits(null, total, 0); InternalSearchHits hits = new InternalSearchHits(null, total, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0),
new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
assertEquals(total, task.getStatus().getTotal()); assertEquals(total, testTask.getStatus().getTotal());
} }
/** /**
@ -172,7 +182,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
// Use assert busy because the update happens on another thread // Use assert busy because the update happens on another thread
final int expectedBatches = batches; final int expectedBatches = batches;
assertBusy(() -> assertEquals(expectedBatches, task.getStatus().getBatches())); assertBusy(() -> assertEquals(expectedBatches, testTask.getStatus().getBatches()));
/* /*
* While we're here we can check that getting a scroll response sets the last scroll start time which makes sure the wait time * While we're here we can check that getting a scroll response sets the last scroll start time which makes sure the wait time
@ -189,7 +199,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
} }
public void testBulkResponseSetsLotsOfStatus() { public void testBulkResponseSetsLotsOfStatus() {
mainRequest.setAbortOnVersionConflict(false); testRequest.setAbortOnVersionConflict(false);
int maxBatches = randomIntBetween(0, 100); int maxBatches = randomIntBetween(0, 100);
long versionConflicts = 0; long versionConflicts = 0;
long created = 0; long created = 0;
@ -230,11 +240,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse)); responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse));
} }
new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0)); new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0));
assertEquals(versionConflicts, task.getStatus().getVersionConflicts()); assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
assertEquals(updated, task.getStatus().getUpdated()); assertEquals(updated, testTask.getStatus().getUpdated());
assertEquals(created, task.getStatus().getCreated()); assertEquals(created, testTask.getStatus().getCreated());
assertEquals(deleted, task.getStatus().getDeleted()); assertEquals(deleted, testTask.getStatus().getDeleted());
assertEquals(versionConflicts, task.getStatus().getVersionConflicts()); assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
} }
} }
@ -265,7 +275,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
assertThat(client.scrollsCleared, contains(scrollId)); assertThat(client.scrollsCleared, contains(scrollId));
// When the task is rejected we don't increment the throttled timer // When the task is rejected we don't increment the throttled timer
assertEquals(timeValueMillis(0), task.getStatus().getThrottled()); assertEquals(timeValueMillis(0), testTask.getStatus().getThrottled());
} }
/** /**
@ -345,7 +355,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
long retryAttempts = 0; long retryAttempts = 0;
for (int i = 0; i < bulksToTry; i++) { for (int i = 0; i < bulksToTry; i++) {
retryAttempts += retryTestCase(false); retryAttempts += retryTestCase(false);
assertEquals(retryAttempts, task.getStatus().getRetries()); assertEquals(retryAttempts, testTask.getStatus().getRetries());
} }
} }
@ -354,16 +364,16 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
*/ */
public void testBulkRejectionsRetryAndFailAnyway() throws Exception { public void testBulkRejectionsRetryAndFailAnyway() throws Exception {
long retryAttempts = retryTestCase(true); long retryAttempts = retryTestCase(true);
assertEquals(retryAttempts, task.getStatus().getRetries()); assertEquals(retryAttempts, testTask.getStatus().getRetries());
} }
public void testPerfectlyThrottledBatchTime() { public void testPerfectlyThrottledBatchTime() {
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
mainRequest.setRequestsPerSecond(0); testRequest.setRequestsPerSecond(0);
assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f));
int total = between(0, 1000000); int total = between(0, 1000000);
task.rethrottle(1); testTask.rethrottle(1);
assertThat((double) action.perfectlyThrottledBatchTime(total), assertThat((double) action.perfectlyThrottledBatchTime(total),
closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1))); closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1)));
} }
@ -389,7 +399,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
action.setScroll(scrollId()); action.setScroll(scrollId());
// We'd like to get about 1 request a second // We'd like to get about 1 request a second
task.rethrottle(1f); testTask.rethrottle(1f);
// Make the last scroll look nearly instant // Make the last scroll look nearly instant
action.setLastBatchStartTime(System.nanoTime()); action.setLastBatchStartTime(System.nanoTime());
// The last batch had 100 documents // The last batch had 100 documents
@ -409,11 +419,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
// Running the command ought to increment the delay counter on the task. // Running the command ought to increment the delay counter on the task.
capturedCommand.get().run(); capturedCommand.get().run();
assertEquals(capturedDelay.get(), task.getStatus().getThrottled()); assertEquals(capturedDelay.get(), testTask.getStatus().getThrottled());
} }
private long retryTestCase(boolean failWithRejection) throws Exception { private long retryTestCase(boolean failWithRejection) throws Exception {
int totalFailures = randomIntBetween(1, mainRequest.getMaxRetries()); int totalFailures = randomIntBetween(1, testRequest.getMaxRetries());
int size = randomIntBetween(1, 100); int size = randomIntBetween(1, 100);
int retryAttempts = totalFailures - (failWithRejection ? 1 : 0); int retryAttempts = totalFailures - (failWithRejection ? 1 : 0);
@ -487,7 +497,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private void refreshTestCase(Boolean refresh, boolean addDestinationIndexes, boolean shouldRefresh) { private void refreshTestCase(Boolean refresh, boolean addDestinationIndexes, boolean shouldRefresh) {
if (refresh != null) { if (refresh != null) {
mainRequest.setRefresh(refresh); testRequest.setRefresh(refresh);
} }
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
if (addDestinationIndexes) { if (addDestinationIndexes) {
@ -526,7 +536,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void testCancelBeforeStartNormalTermination() throws Exception { public void testCancelBeforeStartNormalTermination() throws Exception {
// Refresh or not doesn't matter - we don't try to refresh. // Refresh or not doesn't matter - we don't try to refresh.
mainRequest.setRefresh(usually()); testRequest.setRefresh(usually());
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList(), false)); cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList(), false));
assertNull("No refresh was attempted", client.lastRefreshRequest.get()); assertNull("No refresh was attempted", client.lastRefreshRequest.get());
} }
@ -554,7 +564,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
* that good stuff. * that good stuff.
*/ */
if (delay.nanos() > 0) { if (delay.nanos() > 0) {
generic().execute(() -> taskManager.cancel(task, reason, (Set<String> s) -> {})); generic().execute(() -> taskManager.cancel(testTask, reason, (Set<String> s) -> {}));
} }
return super.schedule(delay, name, command); return super.schedule(delay, name, command);
} }
@ -587,7 +597,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
action.setScroll(scrollId()); action.setScroll(scrollId());
} }
String reason = randomSimpleString(random()); String reason = randomSimpleString(random());
taskManager.cancel(task, reason, (Set<String> s) -> {}); taskManager.cancel(testTask, reason, (Set<String> s) -> {});
testMe.accept(action); testMe.accept(action);
assertEquals(reason, listener.get().getReasonCancelled()); assertEquals(reason, listener.get().getReasonCancelled());
if (previousScrollSet) { if (previousScrollSet) {
@ -599,8 +609,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private class DummyAbstractAsyncBulkByScrollAction private class DummyAbstractAsyncBulkByScrollAction
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, BulkIndexByScrollResponse> { extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, BulkIndexByScrollResponse> {
public DummyAbstractAsyncBulkByScrollAction() { public DummyAbstractAsyncBulkByScrollAction() {
super(AsyncBulkByScrollActionTests.this.task, logger, client, threadPool, super(testTask, logger, new ParentTaskAssigningClient(client, localNode, testTask), threadPool, testRequest, firstSearchRequest,
AsyncBulkByScrollActionTests.this.mainRequest, firstSearchRequest, listener); listener);
} }
@Override @Override
@ -641,6 +651,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute( RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) { Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
lastHeaders.set(threadPool.getThreadContext().getHeaders()); lastHeaders.set(threadPool.getThreadContext().getHeaders());
if (request instanceof ClearScrollRequest) {
assertEquals(TaskId.EMPTY_TASK_ID, request.getParentTask());
} else {
assertEquals(taskId, request.getParentTask());
}
if (request instanceof RefreshRequest) { if (request instanceof RefreshRequest) {
lastRefreshRequest.set((RefreshRequest) request); lastRefreshRequest.set((RefreshRequest) request);
listener.onResponse(null); listener.onResponse(null);

View File

@ -67,8 +67,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
@Override @Override
protected TransportReindexAction.AsyncIndexBySearchAction action() { protected TransportReindexAction.AsyncIndexBySearchAction action() {
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool, request(), listener());
, request(), listener());
} }
@Override @Override

View File

@ -134,7 +134,6 @@ public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScri
@Override @Override
protected AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> action() { protected AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> action() {
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool, request(), listener());
threadPool, request(), listener());
} }
} }

View File

@ -50,7 +50,6 @@ public class UpdateByQueryWithScriptTests
@Override @Override
protected AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> action() { protected AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> action() {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, null, request(), listener());
null, request(), listener());
} }
} }