Rest high level ReindexIT fix (#60834)

ReindexIT would rethrottle any delete or update by query task, fixed to
more precisely match the task started by the test.

Closes #60811
This commit is contained in:
Henning Andersen 2020-08-11 09:27:30 +02:00 committed by Henning Andersen
parent 54279212cf
commit a0b54b53fc
1 changed files with 10 additions and 6 deletions

View File

@ -50,6 +50,7 @@ import org.elasticsearch.tasks.TaskId;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -283,7 +284,7 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
} }
}); });
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME); TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME, updateByQueryRequest.getDescription());
float requestsPerSecond = 1000f; float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
@ -414,7 +415,7 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
} }
}); });
TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME); TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME, deleteByQueryRequest.getDescription());
float requestsPerSecond = 1000f; float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
@ -477,7 +478,7 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
} }
} }
private static TaskId findTaskToRethrottle(String actionName) throws IOException { private static TaskId findTaskToRethrottle(String actionName, String description) throws IOException {
long start = System.nanoTime(); long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest(); ListTasksRequest request = new ListTasksRequest();
request.setActions(actionName); request.setActions(actionName);
@ -485,13 +486,16 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
do { do {
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT); ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
list.rethrowFailures("Finding tasks to rethrottle"); list.rethrowFailures("Finding tasks to rethrottle");
List<TaskGroup> taskGroups =
list.getTaskGroups().stream()
.filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList());
assertThat("tasks are left over from the last execution of this test", assertThat("tasks are left over from the last execution of this test",
list.getTaskGroups(), hasSize(lessThan(2))); taskGroups, hasSize(lessThan(2)));
if (0 == list.getTaskGroups().size()) { if (0 == taskGroups.size()) {
// The parent task hasn't started yet // The parent task hasn't started yet
continue; continue;
} }
TaskGroup taskGroup = list.getTaskGroups().get(0); TaskGroup taskGroup = taskGroups.get(0);
assertThat(taskGroup.getChildTasks(), empty()); assertThat(taskGroup.getChildTasks(), empty());
return taskGroup.getTaskInfo().getTaskId(); return taskGroup.getTaskInfo().getTaskId();
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)); } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));