Closes #64056
This commit is contained in:
Tanguy Leroux 2020-11-10 11:06:25 +01:00 committed by GitHub
parent 21a6a11550
commit 09ff421d4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 31 deletions

View File

@ -175,6 +175,8 @@ import org.elasticsearch.client.ml.job.config.JobUpdate;
import org.elasticsearch.client.ml.job.config.MlFilter;
import org.elasticsearch.client.ml.job.process.ModelSnapshot;
import org.elasticsearch.client.ml.job.stats.JobStats;
import org.elasticsearch.client.tasks.GetTaskRequest;
import org.elasticsearch.client.tasks.GetTaskResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -187,6 +189,7 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.TaskId;
import org.junit.After;
import java.io.IOException;
@ -198,6 +201,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -299,7 +303,18 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync);
assertNull(response.getAcknowledged());
assertNotNull(response.getTask());
final TaskId taskId = response.getTask();
assertNotNull(taskId);
// When wait_for_completion=false the DeleteJobAction stored the task result in the .tasks index. In tests we need to wait
// for the delete job task to complete, otherwise the .tasks index could be created during the execution of a following test.
final GetTaskRequest taskRequest = new GetTaskRequest(taskId.getNodeId(), taskId.getId());
assertBusy(() -> {
Optional<GetTaskResponse> taskResponse = highLevelClient().tasks().get(taskRequest, RequestOptions.DEFAULT);
assertTrue(taskResponse.isPresent());
assertTrue(taskResponse.get().isCompleted());
}, 30L, TimeUnit.SECONDS);
}
public void testOpenJob() throws Exception {

View File

@ -30,13 +30,14 @@ import org.elasticsearch.client.tasks.CancelTasksResponse;
import org.elasticsearch.client.tasks.GetTaskRequest;
import org.elasticsearch.client.tasks.GetTaskResponse;
import org.elasticsearch.client.tasks.TaskId;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import static java.util.Collections.emptyList;
@ -70,11 +71,10 @@ public class TasksIT extends ESRestHighLevelClientTestCase {
}
assertTrue("List tasks were not found", listTasksFound);
}
public void testGetValidTask() throws Exception {
// Run a Reindex to create a task
final String sourceIndex = "source1";
final String destinationIndex = "dest";
Settings settings = Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).build();
@ -85,34 +85,19 @@ public class TasksIT extends ESRestHighLevelClientTestCase {
.add(new IndexRequest(sourceIndex).id("2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assertEquals(RestStatus.OK, highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status());
// (need to use low level client because currently high level client
// doesn't support async return of task id - needs
// https://github.com/elastic/elasticsearch/pull/35202 )
RestClient lowClient = highLevelClient().getLowLevelClient();
Request request = new Request("POST", "_reindex");
request.addParameter("wait_for_completion", "false");
request.setJsonEntity(
"{"
+ " \"source\": {\n"
+ " \"index\": \"source1\"\n"
+ " },\n"
+ " \"dest\": {\n"
+ " \"index\": \"dest\"\n"
+ " }"
+ "}"
);
Response response = lowClient.performRequest(request);
Map<String, Object> map = entityAsMap(response);
Object taskId = map.get("task");
final ReindexRequest reindexRequest = new ReindexRequest().setSourceIndices(sourceIndex).setDestIndex(destinationIndex);
final TaskSubmissionResponse taskSubmissionResponse = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
final String taskId = taskSubmissionResponse.getTask();
assertNotNull(taskId);
TaskId childTaskId = new TaskId(taskId.toString());
TaskId childTaskId = new TaskId(taskId);
GetTaskRequest gtr = new GetTaskRequest(childTaskId.getNodeId(), childTaskId.getId());
gtr.setWaitForCompletion(randomBoolean());
Optional<GetTaskResponse> getTaskResponse = execute(gtr, highLevelClient().tasks()::get, highLevelClient().tasks()::getAsync);
assertTrue(getTaskResponse.isPresent());
GetTaskResponse taskResponse = getTaskResponse.get();
GetTaskResponse taskResponse = getTaskResponse.get();
if (gtr.getWaitForCompletion()) {
assertTrue(taskResponse.isCompleted());
}
@ -121,15 +106,15 @@ public class TasksIT extends ESRestHighLevelClientTestCase {
assertEquals("reindex from [source1] to [dest][_doc]", info.getDescription());
assertEquals("indices:data/write/reindex", info.getAction());
if (taskResponse.isCompleted() == false) {
assertBusy(checkTaskCompletionStatus(client(), taskId.toString()));
assertBusy(checkTaskCompletionStatus(client(), taskId));
}
}
}
public void testGetInvalidTask() throws IOException {
// Check 404s are returned as empty Optionals
GetTaskRequest gtr = new GetTaskRequest("doesNotExistNodeName", 123);
GetTaskRequest gtr = new GetTaskRequest("doesNotExistNodeName", 123);
Optional<GetTaskResponse> getTaskResponse = execute(gtr, highLevelClient().tasks()::get, highLevelClient().tasks()::getAsync);
assertFalse(getTaskResponse.isPresent());
assertFalse(getTaskResponse.isPresent());
}
public void testCancelTasks() throws IOException {