[TEST] wait until reindex tasks ready for rethrottle (#26250)

When slices is set as auto, there's an additional network call
needed for the reindex tasks to know how to rethrottle. Sometimes
the rethrottle action happens before the reindex task is fully
initialized, so in the test we wait for the task to be ready.

This commit also adds some safeguards to ensure that
cancel and rethrottle operations are handled correctly

Closes #26192
This commit is contained in:
Andy Bristol 2017-08-18 11:01:27 -07:00 committed by GitHub
parent c0dbd236c3
commit 6eef6c4f7a
3 changed files with 44 additions and 13 deletions

View File

@ -59,8 +59,8 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
*/ */
public class BulkByScrollTask extends CancellableTask { public class BulkByScrollTask extends CancellableTask {
private LeaderBulkByScrollTaskState leaderState; private volatile LeaderBulkByScrollTaskState leaderState;
private WorkerBulkByScrollTaskState workerState; private volatile WorkerBulkByScrollTaskState workerState;
public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) { public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId); super(id, type, action, description, parentTaskId);
@ -155,6 +155,9 @@ public class BulkByScrollTask extends CancellableTask {
} }
workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond); workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond);
if (isCancelled()) {
workerState.handleCancel();
}
} }
/** /**
@ -170,12 +173,14 @@ public class BulkByScrollTask extends CancellableTask {
@Override @Override
public void onCancelled() { public void onCancelled() {
if (isLeader()) { /*
// The task cancellation task automatically finds children and cancels them, nothing extra to do * If this task is a leader, we don't need to do anything extra because the cancel action cancels child tasks for us
} else if (isWorker()) { * If it's is a worker, we know how to cancel it here
* If we don't know whether it's a leader or worker yet, we do nothing here. If the task is later set to be a worker, we cancel the
* worker at that time.
*/
if (isWorker()) {
workerState.handleCancel(); workerState.handleCancel();
} else {
throw new IllegalStateException("This task has not had its sliced state initialized and doesn't know how to cancel itself");
} }
} }

View File

@ -70,7 +70,8 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
return; return;
} }
throw new IllegalArgumentException("task [" + task.getId() + "] must be set as a child or parent"); throw new IllegalArgumentException("task [" + task.getId() + "] has not yet been initialized to the point where it knows how to " +
"rethrottle itself");
} }
private static void rethrottleParentTask(Logger logger, String localNodeId, Client client, BulkByScrollTask task, private static void rethrottleParentTask(Logger logger, String localNodeId, Client client, BulkByScrollTask task,

View File

@ -19,7 +19,7 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.both;
@ -46,7 +47,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
* too but this is the only place that tests running against multiple nodes so it is the only integration tests that checks for * too but this is the only place that tests running against multiple nodes so it is the only integration tests that checks for
* serialization. * serialization.
*/ */
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/26192")
public class RethrottleTests extends ReindexTestCase { public class RethrottleTests extends ReindexTestCase {
public void testReindex() throws Exception { public void testReindex() throws Exception {
@ -116,9 +116,7 @@ public class RethrottleTests extends ReindexTestCase {
// Now rethrottle it so it'll finish // Now rethrottle it so it'll finish
float newRequestsPerSecond = randomBoolean() ? Float.POSITIVE_INFINITY : between(1, 1000) * 100000; // No throttle or "very fast" float newRequestsPerSecond = randomBoolean() ? Float.POSITIVE_INFINITY : between(1, 1000) * 100000; // No throttle or "very fast"
ListTasksResponse rethrottleResponse = rethrottle().setTaskId(taskToRethrottle).setRequestsPerSecond(newRequestsPerSecond).get(); ListTasksResponse rethrottleResponse = rethrottleTask(taskToRethrottle, newRequestsPerSecond);
rethrottleResponse.rethrowFailures("Rethrottle");
assertThat(rethrottleResponse.getTasks(), hasSize(1));
BulkByScrollTask.Status status = (BulkByScrollTask.Status) rethrottleResponse.getTasks().get(0).getStatus(); BulkByScrollTask.Status status = (BulkByScrollTask.Status) rethrottleResponse.getTasks().get(0).getStatus();
// Now check the resulting requests per second. // Now check the resulting requests per second.
@ -174,6 +172,33 @@ public class RethrottleTests extends ReindexTestCase {
response.getBatches(), greaterThanOrEqualTo(numSlices)); response.getBatches(), greaterThanOrEqualTo(numSlices));
} }
private ListTasksResponse rethrottleTask(TaskId taskToRethrottle, float newRequestsPerSecond) throws Exception {
// the task isn't ready to be rethrottled until it has figured out how many slices it will use. if we rethrottle when the task is
// in this state, the request will fail. so we try a few times
AtomicReference<ListTasksResponse> response = new AtomicReference<>();
assertBusy(() -> {
try {
ListTasksResponse rethrottleResponse = rethrottle()
.setTaskId(taskToRethrottle)
.setRequestsPerSecond(newRequestsPerSecond)
.get();
rethrottleResponse.rethrowFailures("Rethrottle");
assertThat(rethrottleResponse.getTasks(), hasSize(1));
response.set(rethrottleResponse);
} catch (ElasticsearchException e) {
// if it's the error we're expecting, rethrow as AssertionError so awaitBusy doesn't exit early
if (e.getCause() instanceof IllegalArgumentException) {
throw new AssertionError("Rethrottle request for task [" + taskToRethrottle.getId() + "] failed", e);
} else {
throw e;
}
}
});
return response.get();
}
private TaskGroup findTaskToRethrottle(String actionName, int sliceCount) { private TaskGroup findTaskToRethrottle(String actionName, int sliceCount) {
long start = System.nanoTime(); long start = System.nanoTime();
do { do {