Fetch result when wait_for_completion

This makes this sequence:
```
curl -XDELETE localhost:9200/source,dest?pretty
for i in $( seq 1 100 ); do
  curl -XPOST localhost:9200/source/test -d'{"test": "test"}'; echo
done
curl localhost:9200/_refresh?pretty

curl -XPOST 'localhost:9200/_reindex?pretty&wait_for_completion=false' -d'{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest"
  }
}'

curl 'localhost:9200/_tasks/Jsyd6d9wSRW-O-NiiKbPcQ:237?wait_for_completion&pretty'
```

Return task *AND* the response to the user.

This also renames "result" to "response" in the persisted task info
to line it up with how we name the objects in Elasticsearch.
This commit is contained in:
Nik Everett 2016-06-15 18:29:19 -04:00
parent 8078c205f9
commit 5f0292cb81
8 changed files with 152 additions and 64 deletions

View File

@ -41,6 +41,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.PersistedTaskInfo;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskPersistenceService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
@ -140,6 +141,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
Task runningTask = taskManager.getTask(request.getTaskId().getId());
if (runningTask == null) {
// Task isn't running, go look in the task index
getFinishedTaskFromIndex(thisTask, request, listener);
} else {
if (request.getWaitForCompletion()) {
@ -148,9 +150,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
@Override
protected void doRun() throws Exception {
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
// TODO look up the task's result from the .tasks index now that it is done
listener.onResponse(
new GetTaskResponse(new PersistedTaskInfo(runningTask.taskInfo(clusterService.localNode(), true))));
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode(), true), listener);
}
@Override
@ -159,15 +159,44 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
}
});
} else {
listener.onResponse(new GetTaskResponse(new PersistedTaskInfo(runningTask.taskInfo(clusterService.localNode(), true))));
TaskInfo info = runningTask.taskInfo(clusterService.localNode(), true);
listener.onResponse(new GetTaskResponse(new PersistedTaskInfo(false, info)));
}
}
}
/**
* Send a {@link GetRequest} to the results index looking for the results of the task. It'll only be found only if the task's result was
* persisted. Called on the node that once had the task if that node is part of the cluster or on the coordinating node if the node
* wasn't part of the cluster.
* Called after waiting for the task to complete. Attempts to load the results of the task from the tasks index. If it isn't in the
* index then returns a snapshot of the task taken shortly after completion.
*/
void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask,
ActionListener<GetTaskResponse> listener) {
getFinishedTaskFromIndex(thisTask, request, new ActionListener<GetTaskResponse>() {
@Override
public void onResponse(GetTaskResponse response) {
// We were able to load the task from the task index. Let's send that back.
listener.onResponse(response);
}
@Override
public void onFailure(Throwable e) {
/*
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
* the error isn't a 404 then we'll just throw it back to the user.
*/
if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
listener.onResponse(new GetTaskResponse(new PersistedTaskInfo(true, snapshotOfRunningTask)));
} else {
listener.onFailure(e);
}
}
});
}
/**
* Send a {@link GetRequest} to the tasks index looking for a persisted copy of the task completed task. It'll only be found only if the
* task's result was persisted. Called on the node that once had the task if that node is still part of the cluster or on the
* coordinating node if the node is no longer part of the cluster.
*/
void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
GetRequest get = new GetRequest(TaskPersistenceService.TASK_INDEX, TaskPersistenceService.TASK_TYPE,
@ -202,6 +231,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
void onGetFinishedTaskFromIndex(GetResponse response, ActionListener<GetTaskResponse> listener) throws IOException {
if (false == response.isExists()) {
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running or persisted", response.getId()));
return;
}
if (response.isSourceEmpty()) {
listener.onFailure(new ElasticsearchException("Stored task status for [{}] didn't contain any source!", response.getId()));

View File

@ -38,6 +38,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
@ -45,56 +46,61 @@ import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap;
/**
* Information about a persisted or running task. Running tasks just have a {@link #getTask()} while persisted tasks will have either a
* {@link #getError()} or {@link #getResult()}.
* {@link #getError()} or {@link #getResponse()}.
*/
public final class PersistedTaskInfo implements Writeable, ToXContent {
private final boolean completed;
private final TaskInfo task;
@Nullable
private final BytesReference error;
@Nullable
private final BytesReference result;
private final BytesReference response;
/**
* Construct a {@linkplain PersistedTaskInfo} for a running task.
* Construct a {@linkplain PersistedTaskInfo} for a task for which we don't have a result or error. That usually means that the task
* is incomplete, but it could also mean that we waited for the task to complete but it didn't save any error information.
*/
public PersistedTaskInfo(TaskInfo task) {
this(task, null, null);
public PersistedTaskInfo(boolean completed, TaskInfo task) {
this(completed, task, null, null);
}
/**
* Construct a {@linkplain PersistedTaskInfo} for a task that completed with an error.
*/
public PersistedTaskInfo(TaskInfo task, Throwable error) throws IOException {
this(task, toXContent(error), null);
this(true, task, toXContent(error), null);
}
/**
* Construct a {@linkplain PersistedTaskInfo} for a task that completed successfully.
*/
public PersistedTaskInfo(TaskInfo task, ToXContent result) throws IOException {
this(task, null, toXContent(result));
public PersistedTaskInfo(TaskInfo task, ToXContent response) throws IOException {
this(true, task, null, toXContent(response));
}
private PersistedTaskInfo(TaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) {
private PersistedTaskInfo(boolean completed, TaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) {
this.completed = completed;
this.task = requireNonNull(task, "task is required");
this.error = error;
this.result = result;
this.response = result;
}
/**
* Read from a stream.
*/
public PersistedTaskInfo(StreamInput in) throws IOException {
completed = in.readBoolean();
task = new TaskInfo(in);
error = in.readOptionalBytesReference();
result = in.readOptionalBytesReference();
response = in.readOptionalBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(completed);
task.writeTo(out);
out.writeOptionalBytesReference(error);
out.writeOptionalBytesReference(result);
out.writeOptionalBytesReference(response);
}
/**
@ -105,46 +111,45 @@ public final class PersistedTaskInfo implements Writeable, ToXContent {
}
/**
* Get the error that finished this task. Will return null if the task didn't finish with an error or it hasn't yet finished.
* Get the error that finished this task. Will return null if the task didn't finish with an error, it hasn't yet finished, or didn't
* persist its result.
*/
public BytesReference getError() {
return error;
}
/**
* Convert {@link #getError()} from XContent to a Map for easy processing. Will return null if the task didn't finish with an error or
* hasn't yet finished.
* Convert {@link #getError()} from XContent to a Map for easy processing. Will return an empty map if the task didn't finish with an
* error, hasn't yet finished, or didn't persist its result.
*/
public Map<String, Object> getErrorAsMap() {
if (error == null) {
return null;
return emptyMap();
}
return convertToMap(error, false).v2();
}
/**
* Get the result that this task finished with. Will return null if the task was finished by an error or it hasn't yet finished.
* Get the response that this task finished with. Will return null if the task was finished by an error, it hasn't yet finished, or
* didn't persist its result.
*/
public BytesReference getResult() {
return result;
public BytesReference getResponse() {
return response;
}
/**
* Convert {@link #getResult()} from XContent to a Map for easy processing. Will return null if the task was finished with an error or
* hasn't yet finished.
* Convert {@link #getResponse()} from XContent to a Map for easy processing. Will return an empty map if the task was finished with an
* error, hasn't yet finished, or didn't persist its result.
*/
public Map<String, Object> getResultAsMap() {
if (result == null) {
return null;
public Map<String, Object> getResponseAsMap() {
if (response == null) {
return emptyMap();
}
return convertToMap(result, false).v2();
return convertToMap(response, false).v2();
}
/**
* Was the task completed before returned?
*/
public boolean isCompleted() {
return error != null || result != null;
return completed;
}
@Override
@ -159,18 +164,18 @@ public final class PersistedTaskInfo implements Writeable, ToXContent {
if (error != null) {
XContentHelper.writeRawField("error", error, builder, params);
}
if (result != null) {
XContentHelper.writeRawField("result", result, builder, params);
if (response != null) {
XContentHelper.writeRawField("response", response, builder, params);
}
return builder;
}
public static final ConstructingObjectParser<PersistedTaskInfo, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
"persisted_task_info", a -> new PersistedTaskInfo((TaskInfo) a[0], (BytesReference) a[1], (BytesReference) a[2]));
"persisted_task_info", a -> new PersistedTaskInfo(true, (TaskInfo) a[0], (BytesReference) a[1], (BytesReference) a[2]));
static {
PARSER.declareObject(constructorArg(), TaskInfo.PARSER, new ParseField("task"));
PARSER.declareRawObject(optionalConstructorArg(), new ParseField("error"));
PARSER.declareRawObject(optionalConstructorArg(), new ParseField("result"));
PARSER.declareRawObject(optionalConstructorArg(), new ParseField("response"));
}
@Override
@ -189,9 +194,10 @@ public final class PersistedTaskInfo implements Writeable, ToXContent {
* Equality of error and result is done by converting them to a map first. Not efficient but ignores field order and spacing
* differences so perfect for testing.
*/
return Objects.equals(task, other.task)
return Objects.equals(completed, other.completed)
&& Objects.equals(task, other.task)
&& Objects.equals(getErrorAsMap(), other.getErrorAsMap())
&& Objects.equals(getResultAsMap(), other.getResultAsMap());
&& Objects.equals(getResponseAsMap(), other.getResponseAsMap());
}
@Override
@ -200,7 +206,7 @@ public final class PersistedTaskInfo implements Writeable, ToXContent {
* Hashing of error and result is done by converting them to a map first. Not efficient but ignores field order and spacing
* differences so perfect for testing.
*/
return Objects.hash(task, getErrorAsMap(), getResultAsMap());
return Objects.hash(completed, task, getErrorAsMap(), getResponseAsMap());
}
private static BytesReference toXContent(ToXContent result) throws IOException {

View File

@ -37,7 +37,7 @@
}
}
},
"result" : {
"response" : {
"type" : "object",
"enabled" : false
},

View File

@ -70,7 +70,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
@ -85,6 +84,7 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -437,8 +437,8 @@ public class TasksIT extends ESIntegTestCase {
}
public void testListTasksWaitForCompletion() throws Exception {
waitForCompletionTestCase(id -> {
return client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]")
waitForCompletionTestCase(randomBoolean(), id -> {
return client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME)
.setWaitForCompletion(true).execute();
}, response -> {
assertThat(response.getNodeFailures(), empty());
@ -446,25 +446,39 @@ public class TasksIT extends ESIntegTestCase {
});
}
public void testGetTaskWaitForCompletion() throws Exception {
waitForCompletionTestCase(id -> {
public void testGetTaskWaitForCompletionNoPersist() throws Exception {
waitForCompletionTestCase(false, id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
// Really we're just happy we didn't get any exceptions
assertNotNull(response.getTask().getTask());
assertTrue(response.getTask().isCompleted());
// We didn't persist the result so it won't come back when we wait
assertNull(response.getTask().getResponse());
});
}
public void testGetTaskWaitForCompletionWithPersist() throws Exception {
waitForCompletionTestCase(true, id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
assertNotNull(response.getTask().getTask());
assertTrue(response.getTask().isCompleted());
// We persisted the task so we should get its results
assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
});
}
/**
* Test wait for completion.
* @param persist should the task persist its results
* @param wait start waiting for a task. Accepts that id of the task to wait for and returns a future waiting for it.
* @param validator validate the response and return the task ids that were found
*/
private <T> void waitForCompletionTestCase(Function<TaskId, ListenableActionFuture<T>> wait, Consumer<T> validator)
private <T> void waitForCompletionTestCase(boolean persist, Function<TaskId, ListenableActionFuture<T>> wait, Consumer<T> validator)
throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute();
.setShouldPersistResult(persist).execute();
ListenableActionFuture<T> waitResponseFuture;
TaskId taskId;
@ -513,7 +527,7 @@ public class TasksIT extends ESIntegTestCase {
public void testListTasksWaitForTimeout() throws Exception {
waitForTimeoutTestCase(id -> {
ListTasksResponse response = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(100))
.setActions(TestTaskPlugin.TestTaskAction.NAME).setWaitForCompletion(true).setTimeout(timeValueMillis(100))
.get();
assertThat(response.getNodeFailures(), not(empty()));
return response.getNodeFailures();
@ -539,6 +553,9 @@ public class TasksIT extends ESIntegTestCase {
try {
TaskId taskId = waitForTestTaskStartOnAllNodes();
// Wait for the task to start
assertBusy(() -> client().admin().cluster().prepareGetTask(taskId).get());
// Spin up a request that should wait for those tasks to finish
// It will timeout because we haven't unblocked the tasks
Iterable<? extends Throwable> failures = wait.apply(taskId);
@ -554,15 +571,18 @@ public class TasksIT extends ESIntegTestCase {
future.get();
}
/**
* Wait for the test task to be running on all nodes and return the TaskId of the primary task.
*/
private TaskId waitForTestTaskStartOnAllNodes() throws Exception {
AtomicReference<TaskId> result = new AtomicReference<>();
assertBusy(() -> {
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]")
.get().getTasks();
assertEquals(internalCluster().size(), tasks.size());
result.set(tasks.get(0).getTaskId());
});
return result.get();
List<TaskInfo> task = client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME).get().getTasks();
assertThat(task, hasSize(1));
return task.get(0).getTaskId();
}
public void testTasksListWaitForNoTask() throws Exception {
@ -626,7 +646,7 @@ public class TasksIT extends ESIntegTestCase {
assertEquals(Long.toString(taskInfo.getId()), task.get("id").toString());
@SuppressWarnings("unchecked")
Map<String, Object> result = (Map<String, Object>) source.get("result");
Map<String, Object> result = (Map<String, Object>) source.get("response");
assertEquals("0", result.get("failure_count").toString());
assertNull(source.get("failure"));
@ -647,7 +667,7 @@ public class TasksIT extends ESIntegTestCase {
assertEquals(1L, searchResponse.getHits().totalHits());
GetTaskResponse getResponse = expectFinishedTask(taskId);
assertEquals(result, getResponse.getTask().getResultAsMap());
assertEquals(result, getResponse.getTask().getResponseAsMap());
assertNull(getResponse.getTask().getError());
}
@ -688,7 +708,7 @@ public class TasksIT extends ESIntegTestCase {
assertNull(source.get("result"));
GetTaskResponse getResponse = expectFinishedTask(failedTaskId);
assertNull(getResponse.getTask().getResult());
assertNull(getResponse.getTask().getResponse());
assertEquals(error, getResponse.getTask().getErrorAsMap());
}
@ -728,7 +748,7 @@ public class TasksIT extends ESIntegTestCase {
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
assertEquals("test", response.getTask().getTask().getAction());
assertNotNull(response.getTask().getError());
assertNull(response.getTask().getResult());
assertNull(response.getTask().getResponse());
}
@Override

View File

@ -78,11 +78,11 @@ public class PersistedTaskInfoTests extends ESTestCase {
private static PersistedTaskInfo randomTaskResult() throws IOException {
switch (between(0, 2)) {
case 0:
return new PersistedTaskInfo(randomTaskInfo());
return new PersistedTaskInfo(randomBoolean(), randomTaskInfo());
case 1:
return new PersistedTaskInfo(randomTaskInfo(), new RuntimeException("error"));
case 2:
return new PersistedTaskInfo(randomTaskInfo(), randomTaskActionResult());
return new PersistedTaskInfo(randomTaskInfo(), randomTaskResponse());
default:
throw new UnsupportedOperationException("Unsupported random TaskResult constructor");
}
@ -117,7 +117,7 @@ public class PersistedTaskInfoTests extends ESTestCase {
}
}
private static ToXContent randomTaskActionResult() {
private static ToXContent randomTaskResponse() {
Map<String, String> result = new TreeMap<>();
int fields = between(0, 10);
for (int f = 0; f < fields; f++) {
@ -126,7 +126,7 @@ public class PersistedTaskInfoTests extends ESTestCase {
return new ToXContent() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// Results in Elasticsearch never output a leading startObject. There isn't really a good reason, they just don't.
// Responses in Elasticsearch never output a leading startObject. There isn't really a good reason, they just don't.
for (Map.Entry<String, String> entry : result.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}

View File

@ -76,6 +76,17 @@
# The task will be in the response even if it finished before we got here
# because of task persistence.
- is_true: task
- is_false: response.timed_out
- match: {response.deleted: 1}
- is_false: response.created
- is_false: response.updated
- match: {response.version_conflicts: 0}
- match: {response.batches: 1}
- match: {response.failures: []}
- match: {response.noops: 0}
- match: {response.throttled_millis: 0}
- gte: { response.took: 0 }
- is_false: response.task
---
"Response for version conflict":

View File

@ -100,6 +100,15 @@
# The task will be in the response even if it finished before we got here
# because of task persistence.
- is_true: task
- match: {response.created: 1}
- match: {response.updated: 0}
- match: {response.version_conflicts: 0}
- match: {response.batches: 1}
- match: {response.failures: []}
- match: {response.throttled_millis: 0}
- gte: { response.took: 0 }
- is_false: response.task
- is_false: response.deleted
---
"Response format for version conflict":

View File

@ -60,6 +60,18 @@
# The task will be in the response even if it finished before we got here
# because of task persistence.
- is_true: task
- is_false: response.timed_out
- match: {response.updated: 1}
- match: {response.version_conflicts: 0}
- match: {response.batches: 1}
- match: {response.failures: []}
- match: {response.noops: 0}
- match: {response.throttled_millis: 0}
- gte: { response.took: 0 }
# Update by query can't create
- is_false: response.created
- is_false: response.task
- is_false: response.deleted
---
"Response for version conflict":