From c3973c9ae663c4650a9be885073e885e78d912bd Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Thu, 14 Mar 2019 10:12:58 -0700 Subject: [PATCH] HADOOP-16192. Fix CallQueue backoff bugs: perform backoff when add() is used and update backoff when refreshed. (cherry-picked from 8c95cb9d6bef369fef6a8364f0c0764eba90e44a) (cherry-picked from 0de8b55a095ada2b98c0a41899651bd8e524f42e) (cherry-picked from d4fbbc83ad8c4d818deccf62b4c54cead1d17a8f) (cherry-picked from e172fc62ce3fa10a36582bc6667edec931fea9fa) --- .../org/apache/hadoop/ipc/CallQueueManager.java | 11 ++++++++++- .../src/main/java/org/apache/hadoop/ipc/Server.java | 1 + .../org/apache/hadoop/ipc/TestCallQueueManager.java | 13 +++++++++++++ .../test/java/org/apache/hadoop/ipc/TestRPC.java | 6 ++++-- 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index d1bd1807b03..e73ef5371a8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -221,12 +221,21 @@ public class CallQueueManager } else if (shouldBackOff(e)) { throwBackoff(); } else { - add(e); + // No need to re-check backoff criteria since they were just checked + addInternal(e, false); } } @Override public boolean add(E e) { + return addInternal(e, true); + } + + @VisibleForTesting + boolean addInternal(E e, boolean checkBackoff) { + if (checkBackoff && isClientBackoffEnabled() && shouldBackOff(e)) { + throwBackoff(); + } try { return putRef.get().add(e); } catch (CallQueueOverflowException ex) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 1d5292c5f14..154877bf327 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -661,6 +661,7 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); callQueue.swapQueue(getSchedulerClass(prefix, conf), getQueueClass(prefix, conf), maxQueueSize, prefix, conf); + callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf)); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index a5a0b0008cb..2b739966d84 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -434,5 +434,18 @@ public class TestCallQueueManager { } verify(queue, times(0)).put(call); verify(queue, times(0)).add(call); + + // backoff is enabled, add + scheduler backoff = overflow exception. + reset(queue); + cqm.setClientBackoffEnabled(true); + doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call); + try { + cqm.add(call); + fail("didn't fail"); + } catch (Exception ex) { + assertTrue(ex.toString(), ex instanceof CallQueueOverflowException); + } + verify(queue, times(0)).put(call); + verify(queue, times(0)).add(call); } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index d08c4b4c33a..2f15d2752f7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1135,7 +1135,8 @@ public class TestRPC extends TestRpcBase { return null; } })); - verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).addInternal( + Mockito.anyObject(), Mockito.eq(false)); } try { proxy.sleep(null, newSleepRequest(100)); @@ -1206,7 +1207,8 @@ public class TestRPC extends TestRpcBase { return null; } })); - verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).addInternal( + Mockito.anyObject(), Mockito.eq(false)); } // Start another sleep RPC call and verify the call is backed off due to // avg response time(3s) exceeds threshold (2s).