diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index d1bd1807b03..e73ef5371a8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -221,12 +221,21 @@ public class CallQueueManager } else if (shouldBackOff(e)) { throwBackoff(); } else { - add(e); + // No need to re-check backoff criteria since they were just checked + addInternal(e, false); } } @Override public boolean add(E e) { + return addInternal(e, true); + } + + @VisibleForTesting + boolean addInternal(E e, boolean checkBackoff) { + if (checkBackoff && isClientBackoffEnabled() && shouldBackOff(e)) { + throwBackoff(); + } try { return putRef.get().add(e); } catch (CallQueueOverflowException ex) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 1d5292c5f14..154877bf327 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -661,6 +661,7 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); callQueue.swapQueue(getSchedulerClass(prefix, conf), getQueueClass(prefix, conf), maxQueueSize, prefix, conf); + callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf)); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index a5a0b0008cb..2b739966d84 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -434,5 +434,18 @@ public class TestCallQueueManager { } verify(queue, times(0)).put(call); verify(queue, times(0)).add(call); + + // backoff is enabled, add + scheduler backoff = overflow exception. + reset(queue); + cqm.setClientBackoffEnabled(true); + doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call); + try { + cqm.add(call); + fail("didn't fail"); + } catch (Exception ex) { + assertTrue(ex.toString(), ex instanceof CallQueueOverflowException); + } + verify(queue, times(0)).put(call); + verify(queue, times(0)).add(call); } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index d08c4b4c33a..2f15d2752f7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1135,7 +1135,8 @@ public class TestRPC extends TestRpcBase { return null; } })); - verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).addInternal( + Mockito.anyObject(), Mockito.eq(false)); } try { proxy.sleep(null, newSleepRequest(100)); @@ -1206,7 +1207,8 @@ public class TestRPC extends TestRpcBase { return null; } })); - verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).addInternal( + Mockito.anyObject(), Mockito.eq(false)); } // Start another sleep RPC call and verify the call is backed off due to // avg response time(3s) exceeds threshold (2s).