HBASE-27491 Do not clear cache on RejectedExecutionException (#4914)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
Bri Augenreich 2022-12-09 08:45:40 -05:00 committed by Bryan Beaudreault
parent cc86844b8a
commit f1bfda1aa3
1 changed files with 18 additions and 8 deletions

View File

@ -219,13 +219,13 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
} catch (IOException e) { } catch (IOException e) {
// The service itself failed . It may be an error coming from the communication // The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server. // layer, but, as well, a functional error raised by the server.
receiveGlobalFailure(multiAction, server, numAttempt, e); receiveGlobalFailure(multiAction, server, numAttempt, e, true);
return; return;
} catch (Throwable t) { } catch (Throwable t) {
// This should not happen. Let's log & retry anyway. // This should not happen. Let's log & retry anyway.
LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected."
+ " Retrying. Server=" + server + ", tableName=" + tableName, t); + " Retrying. Server=" + server + ", tableName=" + tableName, t);
receiveGlobalFailure(multiAction, server, numAttempt, t); receiveGlobalFailure(multiAction, server, numAttempt, t, true);
return; return;
} }
if (res.type() == AbstractResponse.ResponseType.MULTI) { if (res.type() == AbstractResponse.ResponseType.MULTI) {
@ -521,6 +521,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
*/ */
void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt, void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt,
List<Action> actionsForReplicaThread, boolean reuseThread) { List<Action> actionsForReplicaThread, boolean reuseThread) {
boolean clearServerCache = true;
// Run the last item on the same thread if we are already on a send thread. // Run the last item on the same thread if we are already on a send thread.
// We hope most of the time it will be the only item, so we can cut down on threads. // We hope most of the time it will be the only item, so we can cut down on threads.
int actionsRemaining = actionsByServer.size(); int actionsRemaining = actionsByServer.size();
@ -555,6 +556,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
// let's secure this a little. // let's secure this a little.
LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + " Server=" LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + " Server="
+ server.getServerName(), t); + server.getServerName(), t);
// Do not update cache if exception is from failing to submit action to thread pool
clearServerCache = false;
} else { } else {
// see #HBASE-14359 for more details // see #HBASE-14359 for more details
LOG.warn("Caught unexpected exception/error: ", t); LOG.warn("Caught unexpected exception/error: ", t);
@ -562,7 +565,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
asyncProcess.decTaskCounters(multiAction.getRegions(), server); asyncProcess.decTaskCounters(multiAction.getRegions(), server);
// We're likely to fail again, but this will increment the attempt counter, // We're likely to fail again, but this will increment the attempt counter,
// so it will finish. // so it will finish.
receiveGlobalFailure(multiAction, server, numAttempt, t); receiveGlobalFailure(multiAction, server, numAttempt, t, clearServerCache);
} }
} }
} }
@ -717,11 +720,15 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* @param t the throwable (if any) that caused the resubmit * @param t the throwable (if any) that caused the resubmit
*/ */
private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int numAttempt, private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int numAttempt,
Throwable t) { Throwable t, boolean clearServerCache) {
errorsByServer.reportServerError(server); errorsByServer.reportServerError(server);
Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
// Do not update cache if exception is from failing to submit action to thread pool
if (clearServerCache) {
cleanServerCache(server, t); cleanServerCache(server, t);
}
int failed = 0; int failed = 0;
int stopped = 0; int stopped = 0;
List<Action> toReplay = new ArrayList<>(); List<Action> toReplay = new ArrayList<>();
@ -729,9 +736,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
byte[] regionName = e.getKey(); byte[] regionName = e.getKey();
byte[] row = e.getValue().get(0).getAction().getRow(); byte[] row = e.getValue().get(0).getAction().getRow();
// Do not use the exception for updating cache because it might be coming from // Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction. // any of the regions in the MultiAction and do not update cache if exception is
// from failing to submit action to thread pool
if (clearServerCache) {
updateCachedLocations(server, regionName, row, updateCachedLocations(server, regionName, row,
ClientExceptionsUtil.isMetaClearingException(t) ? null : t); ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
}
for (Action action : e.getValue()) { for (Action action : e.getValue()) {
Retry retry = Retry retry =
manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server); manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server);