Submit _async search task should cancel children on cancellation (#58332)
This change allows the submit async search task to cancel children and removes the manual indirection that cancels the search task when the submit task is cancelled. This is now handled by the task cancellation, which can cancel grand-children since #54757.
This commit is contained in:
parent
88f1dab8b5
commit
fcd8a432d9
|
@ -36,7 +36,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -46,7 +45,6 @@ import static java.util.Collections.singletonList;
|
|||
* Task that tracks the progress of a currently running {@link SearchRequest}.
|
||||
*/
|
||||
final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
||||
private final BooleanSupplier checkSubmitCancellation;
|
||||
private final AsyncExecutionId searchId;
|
||||
private final Client client;
|
||||
private final ThreadPool threadPool;
|
||||
|
@ -73,7 +71,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
|||
* @param type The type of the task.
|
||||
* @param action The action name.
|
||||
* @param parentTaskId The parent task id.
|
||||
* @param checkSubmitCancellation A boolean supplier that checks if the submit task has been cancelled.
|
||||
* @param originHeaders All the request context headers.
|
||||
* @param taskHeaders The filtered request headers for the task.
|
||||
* @param searchId The {@link AsyncExecutionId} of the task.
|
||||
|
@ -84,7 +81,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
|||
String type,
|
||||
String action,
|
||||
TaskId parentTaskId,
|
||||
BooleanSupplier checkSubmitCancellation,
|
||||
TimeValue keepAlive,
|
||||
Map<String, String> originHeaders,
|
||||
Map<String, String> taskHeaders,
|
||||
|
@ -93,7 +89,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
|||
ThreadPool threadPool,
|
||||
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
|
||||
super(id, type, action, "async_search", parentTaskId, taskHeaders);
|
||||
this.checkSubmitCancellation = checkSubmitCancellation;
|
||||
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();
|
||||
this.originHeaders = originHeaders;
|
||||
this.searchId = searchId;
|
||||
|
@ -319,12 +314,9 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
|||
// checks if the search task should be cancelled
|
||||
private synchronized void checkCancellation() {
|
||||
long now = System.currentTimeMillis();
|
||||
if (hasCompleted == false &&
|
||||
expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
|
||||
// we cancel the search task if the initial submit task was cancelled,
|
||||
// this is needed because the task cancellation mechanism doesn't
|
||||
// handle the cancellation of grand-children.
|
||||
cancelTask(() -> {}, checkSubmitCancellation.getAsBoolean() ? "submit was cancelled" : "async search has expired");
|
||||
if (hasCompleted == false && expirationTimeMillis < now) {
|
||||
// we cancel expired search task even if they are still running
|
||||
cancelTask(() -> {}, "async search has expired");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,9 +29,7 @@ import org.elasticsearch.index.engine.DocumentMissingException;
|
|||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskCancelledException;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
|
||||
|
@ -74,8 +72,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, SubmitAsyncSearchRequest request, ActionListener<AsyncSearchResponse> submitListener) {
|
||||
CancellableTask submitTask = (CancellableTask) task;
|
||||
protected void doExecute(Task submitTask, SubmitAsyncSearchRequest request, ActionListener<AsyncSearchResponse> submitListener) {
|
||||
final SearchRequest searchRequest = createSearchRequest(request, submitTask, request.getKeepAlive());
|
||||
AsyncSearchTask searchTask = (AsyncSearchTask) taskManager.register("transport", SearchAction.INSTANCE.name(), searchRequest);
|
||||
searchAction.execute(searchTask, searchRequest, searchTask.getSearchProgressActionListener());
|
||||
|
@ -87,42 +84,34 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||
// the task is still running and the user cannot wait more so we create
|
||||
// a document for further retrieval
|
||||
try {
|
||||
if (submitTask.isCancelled()) {
|
||||
// the user cancelled the submit so we don't store anything
|
||||
// and propagate the failure
|
||||
Exception cause = new TaskCancelledException(submitTask.getReasonCancelled());
|
||||
onFatalFailure(searchTask, cause, searchResponse.isRunning(),
|
||||
"submit task is cancelled", submitListener);
|
||||
} else {
|
||||
final String docId = searchTask.getExecutionId().getDocId();
|
||||
// creates the fallback response if the node crashes/restarts in the middle of the request
|
||||
// TODO: store intermediate results ?
|
||||
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
|
||||
store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp,
|
||||
new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse r) {
|
||||
if (searchResponse.isRunning()) {
|
||||
try {
|
||||
// store the final response on completion unless the submit is cancelled
|
||||
searchTask.addCompletionListener(finalResponse ->
|
||||
onFinalResponse(submitTask, searchTask, finalResponse, () -> {}));
|
||||
} finally {
|
||||
submitListener.onResponse(searchResponse);
|
||||
}
|
||||
} else {
|
||||
onFinalResponse(submitTask, searchTask, searchResponse,
|
||||
() -> submitListener.onResponse(searchResponse));
|
||||
final String docId = searchTask.getExecutionId().getDocId();
|
||||
// creates the fallback response if the node crashes/restarts in the middle of the request
|
||||
// TODO: store intermediate results ?
|
||||
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
|
||||
store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp,
|
||||
new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse r) {
|
||||
if (searchResponse.isRunning()) {
|
||||
try {
|
||||
// store the final response on completion unless the submit is cancelled
|
||||
searchTask.addCompletionListener(finalResponse ->
|
||||
onFinalResponse(searchTask, finalResponse, () -> {
|
||||
}));
|
||||
} finally {
|
||||
submitListener.onResponse(searchResponse);
|
||||
}
|
||||
} else {
|
||||
onFinalResponse(searchTask, searchResponse, () -> submitListener.onResponse(searchResponse));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exc) {
|
||||
onFatalFailure(searchTask, exc, searchResponse.isRunning(),
|
||||
"unable to store initial response", submitListener);
|
||||
}
|
||||
});
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Exception exc) {
|
||||
onFatalFailure(searchTask, exc, searchResponse.isRunning(),
|
||||
"unable to store initial response", submitListener);
|
||||
}
|
||||
});
|
||||
} catch (Exception exc) {
|
||||
onFatalFailure(searchTask, exc, searchResponse.isRunning(), "generic error", submitListener);
|
||||
}
|
||||
|
@ -141,7 +130,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||
}, request.getWaitForCompletionTimeout());
|
||||
}
|
||||
|
||||
private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, CancellableTask submitTask, TimeValue keepAlive) {
|
||||
private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, Task submitTask, TimeValue keepAlive) {
|
||||
String docID = UUIDs.randomBase64UUID();
|
||||
Map<String, String> originHeaders = nodeClient.threadPool().getThreadContext().getHeaders();
|
||||
SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()) {
|
||||
|
@ -150,9 +139,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
|
||||
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
|
||||
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
|
||||
return new AsyncSearchTask(id, type, action, parentTaskId,
|
||||
submitTask::isCancelled, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(),
|
||||
nodeClient.threadPool(), aggReduceContextSupplier);
|
||||
return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive,
|
||||
originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier);
|
||||
}
|
||||
};
|
||||
searchRequest.setParentTask(new TaskId(nodeClient.getLocalNodeId(), submitTask.getId()));
|
||||
|
@ -178,11 +166,10 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||
}
|
||||
}
|
||||
|
||||
private void onFinalResponse(CancellableTask submitTask,
|
||||
AsyncSearchTask searchTask,
|
||||
private void onFinalResponse(AsyncSearchTask searchTask,
|
||||
AsyncSearchResponse response,
|
||||
Runnable nextAction) {
|
||||
if (submitTask.isCancelled() || searchTask.isCancelled()) {
|
||||
if (searchTask.isCancelled()) {
|
||||
// the task was cancelled so we ensure that there is nothing stored in the response index.
|
||||
store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
|
||||
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
|
||||
|
|
|
@ -52,13 +52,13 @@ public class AsyncSearchTaskTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private AsyncSearchTask createAsyncSearchTask() {
|
||||
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
|
||||
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
|
||||
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
|
||||
new NoOpClient(threadPool), threadPool, null);
|
||||
}
|
||||
|
||||
public void testWaitForInit() throws InterruptedException {
|
||||
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
|
||||
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
|
||||
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
|
||||
new NoOpClient(threadPool), threadPool, null);
|
||||
int numShards = randomIntBetween(0, 10);
|
||||
|
|
|
@ -154,8 +154,7 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
|
|||
return new CancellableTask(id, type, action, null, parentTaskId, headers) {
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
// we cancel the underlying search action explicitly in the submit action
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue