diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index acf77aec9b3..ae0df2740e0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -175,6 +175,8 @@ import org.elasticsearch.client.ml.job.config.JobUpdate; import org.elasticsearch.client.ml.job.config.MlFilter; import org.elasticsearch.client.ml.job.process.ModelSnapshot; import org.elasticsearch.client.ml.job.stats.JobStats; +import org.elasticsearch.client.tasks.GetTaskRequest; +import org.elasticsearch.client.tasks.GetTaskResponse; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -187,6 +189,7 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.tasks.TaskId; import org.junit.After; import java.io.IOException; @@ -198,6 +201,7 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -299,7 +303,18 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase { DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync); assertNull(response.getAcknowledged()); - assertNotNull(response.getTask()); + + final TaskId taskId = response.getTask(); + assertNotNull(taskId); + + // When wait_for_completion=false the DeleteJobAction stored the task result in the .tasks index. In tests we need to wait + // for the delete job task to complete, otherwise the .tasks index could be created during the execution of a following test. + final GetTaskRequest taskRequest = new GetTaskRequest(taskId.getNodeId(), taskId.getId()); + assertBusy(() -> { + Optional taskResponse = highLevelClient().tasks().get(taskRequest, RequestOptions.DEFAULT); + assertTrue(taskResponse.isPresent()); + assertTrue(taskResponse.get().isCompleted()); + }, 30L, TimeUnit.SECONDS); } public void testOpenJob() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java index d1e386bca0d..264437226bf 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java @@ -30,13 +30,14 @@ import org.elasticsearch.client.tasks.CancelTasksResponse; import org.elasticsearch.client.tasks.GetTaskRequest; import org.elasticsearch.client.tasks.GetTaskResponse; import org.elasticsearch.client.tasks.TaskId; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.RestStatus; import java.io.IOException; import java.util.Collections; -import java.util.Map; import java.util.Optional; import static java.util.Collections.emptyList; @@ -70,11 +71,10 @@ public class TasksIT extends ESRestHighLevelClientTestCase { } assertTrue("List tasks were not found", listTasksFound); } - + public void testGetValidTask() throws Exception { // Run a Reindex to create a task - final String sourceIndex = "source1"; final String destinationIndex = "dest"; Settings settings = Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).build(); @@ -85,34 +85,19 @@ public class TasksIT extends ESRestHighLevelClientTestCase { .add(new IndexRequest(sourceIndex).id("2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) .setRefreshPolicy(RefreshPolicy.IMMEDIATE); assertEquals(RestStatus.OK, highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status()); - - // (need to use low level client because currently high level client - // doesn't support async return of task id - needs - // https://github.com/elastic/elasticsearch/pull/35202 ) - RestClient lowClient = highLevelClient().getLowLevelClient(); - Request request = new Request("POST", "_reindex"); - request.addParameter("wait_for_completion", "false"); - request.setJsonEntity( - "{" - + " \"source\": {\n" - + " \"index\": \"source1\"\n" - + " },\n" - + " \"dest\": {\n" - + " \"index\": \"dest\"\n" - + " }" - + "}" - ); - Response response = lowClient.performRequest(request); - Map map = entityAsMap(response); - Object taskId = map.get("task"); + + final ReindexRequest reindexRequest = new ReindexRequest().setSourceIndices(sourceIndex).setDestIndex(destinationIndex); + final TaskSubmissionResponse taskSubmissionResponse = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); + + final String taskId = taskSubmissionResponse.getTask(); assertNotNull(taskId); - TaskId childTaskId = new TaskId(taskId.toString()); + TaskId childTaskId = new TaskId(taskId); GetTaskRequest gtr = new GetTaskRequest(childTaskId.getNodeId(), childTaskId.getId()); gtr.setWaitForCompletion(randomBoolean()); Optional getTaskResponse = execute(gtr, highLevelClient().tasks()::get, highLevelClient().tasks()::getAsync); assertTrue(getTaskResponse.isPresent()); - GetTaskResponse taskResponse = getTaskResponse.get(); + GetTaskResponse taskResponse = getTaskResponse.get(); if (gtr.getWaitForCompletion()) { assertTrue(taskResponse.isCompleted()); } @@ -121,15 +106,15 @@ public class TasksIT extends ESRestHighLevelClientTestCase { assertEquals("reindex from [source1] to [dest][_doc]", info.getDescription()); assertEquals("indices:data/write/reindex", info.getAction()); if (taskResponse.isCompleted() == false) { - assertBusy(checkTaskCompletionStatus(client(), taskId.toString())); + assertBusy(checkTaskCompletionStatus(client(), taskId)); } - } - + } + public void testGetInvalidTask() throws IOException { // Check 404s are returned as empty Optionals - GetTaskRequest gtr = new GetTaskRequest("doesNotExistNodeName", 123); + GetTaskRequest gtr = new GetTaskRequest("doesNotExistNodeName", 123); Optional getTaskResponse = execute(gtr, highLevelClient().tasks()::get, highLevelClient().tasks()::getAsync); - assertFalse(getTaskResponse.isPresent()); + assertFalse(getTaskResponse.isPresent()); } public void testCancelTasks() throws IOException {