mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-09 06:25:07 +00:00
Specify reason whenever async search gets cancelled (#57761)
This allows to trace where the cancel tasks request came from given that it may be triggered for multiple reasons.
This commit is contained in:
parent
619e4f8c02
commit
06ef3042c1
@ -134,9 +134,9 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
|||||||
/**
|
/**
|
||||||
* Cancels the running task and its children.
|
* Cancels the running task and its children.
|
||||||
*/
|
*/
|
||||||
public void cancelTask(Runnable runnable) {
|
public void cancelTask(Runnable runnable, String reason) {
|
||||||
if (isCancelled() == false && isCancelling.compareAndSet(false, true)) {
|
if (isCancelled() == false && isCancelling.compareAndSet(false, true)) {
|
||||||
CancelTasksRequest req = new CancelTasksRequest().setTaskId(searchId.getTaskId());
|
CancelTasksRequest req = new CancelTasksRequest().setTaskId(searchId.getTaskId()).setReason(reason);
|
||||||
client.admin().cluster().cancelTasks(req, new ActionListener<CancelTasksResponse>() {
|
client.admin().cluster().cancelTasks(req, new ActionListener<CancelTasksResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(CancelTasksResponse cancelTasksResponse) {
|
public void onResponse(CancelTasksResponse cancelTasksResponse) {
|
||||||
@ -316,8 +316,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
|||||||
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
|
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// checks if the search task should be cancelled
|
// checks if the search task should be cancelled
|
||||||
private synchronized void checkCancellation() {
|
private synchronized void checkCancellation() {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
@ -326,7 +324,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
|||||||
// we cancel the search task if the initial submit task was cancelled,
|
// we cancel the search task if the initial submit task was cancelled,
|
||||||
// this is needed because the task cancellation mechanism doesn't
|
// this is needed because the task cancellation mechanism doesn't
|
||||||
// handle the cancellation of grand-children.
|
// handle the cancellation of grand-children.
|
||||||
cancelTask(() -> {});
|
cancelTask(() -> {}, checkSubmitCancellation.getAsBoolean() ? "submit was cancelled" : "async search has expired");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ public class TransportDeleteAsyncSearchAction extends HandledTransportAction<Del
|
|||||||
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
|
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
|
||||||
listener.onFailure(exc);
|
listener.onFailure(exc);
|
||||||
}
|
}
|
||||||
})));
|
})), "cancelled by user");
|
||||||
} else {
|
} else {
|
||||||
// the task was not found (already cancelled, already completed, or invalid id?)
|
// the task was not found (already cancelled, already completed, or invalid id?)
|
||||||
// we fail if the response is not found in the index
|
// we fail if the response is not found in the index
|
||||||
|
@ -91,7 +91,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||||||
// the user cancelled the submit so we don't store anything
|
// the user cancelled the submit so we don't store anything
|
||||||
// and propagate the failure
|
// and propagate the failure
|
||||||
Exception cause = new TaskCancelledException(submitTask.getReasonCancelled());
|
Exception cause = new TaskCancelledException(submitTask.getReasonCancelled());
|
||||||
onFatalFailure(searchTask, cause, searchResponse.isRunning(), submitListener);
|
onFatalFailure(searchTask, cause, searchResponse.isRunning(),
|
||||||
|
"submit task is cancelled", submitListener);
|
||||||
} else {
|
} else {
|
||||||
final String docId = searchTask.getExecutionId().getDocId();
|
final String docId = searchTask.getExecutionId().getDocId();
|
||||||
// creates the fallback response if the node crashes/restarts in the middle of the request
|
// creates the fallback response if the node crashes/restarts in the middle of the request
|
||||||
@ -117,12 +118,13 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception exc) {
|
public void onFailure(Exception exc) {
|
||||||
onFatalFailure(searchTask, exc, searchResponse.isRunning(), submitListener);
|
onFatalFailure(searchTask, exc, searchResponse.isRunning(),
|
||||||
|
"unable to store initial response", submitListener);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (Exception exc) {
|
} catch (Exception exc) {
|
||||||
onFatalFailure(searchTask, exc, searchResponse.isRunning(), submitListener);
|
onFatalFailure(searchTask, exc, searchResponse.isRunning(), "generic error", submitListener);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// the task completed within the timeout so the response is sent back to the user
|
// the task completed within the timeout so the response is sent back to the user
|
||||||
@ -157,7 +159,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||||||
return searchRequest;
|
return searchRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shouldCancel, ActionListener<AsyncSearchResponse> listener) {
|
private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shouldCancel, String cancelReason,
|
||||||
|
ActionListener<AsyncSearchResponse> listener) {
|
||||||
if (shouldCancel && task.isCancelled() == false) {
|
if (shouldCancel && task.isCancelled() == false) {
|
||||||
task.cancelTask(() -> {
|
task.cancelTask(() -> {
|
||||||
try {
|
try {
|
||||||
@ -165,7 +168,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
|
|||||||
} finally {
|
} finally {
|
||||||
listener.onFailure(error);
|
listener.onFailure(error);
|
||||||
}
|
}
|
||||||
});
|
}, "fatal failure: " + cancelReason);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
task.addCompletionListener(finalResponse -> taskManager.unregister(task));
|
task.addCompletionListener(finalResponse -> taskManager.unregister(task));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user