diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index d1bd1807b03..e73ef5371a8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -221,12 +221,21 @@ public class CallQueueManager } else if (shouldBackOff(e)) { throwBackoff(); } else { - add(e); + // No need to re-check backoff criteria since they were just checked + addInternal(e, false); } } @Override public boolean add(E e) { + return addInternal(e, true); + } + + @VisibleForTesting + boolean addInternal(E e, boolean checkBackoff) { + if (checkBackoff && isClientBackoffEnabled() && shouldBackOff(e)) { + throwBackoff(); + } try { return putRef.get().add(e); } catch (CallQueueOverflowException ex) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index c5da3b18097..041b2b4caf7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -662,6 +662,7 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); callQueue.swapQueue(getSchedulerClass(prefix, conf), getQueueClass(prefix, conf), maxQueueSize, prefix, conf); + callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf)); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index a5a0b0008cb..2b739966d84 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -434,5 +434,18 @@ public class TestCallQueueManager { } verify(queue, times(0)).put(call); verify(queue, times(0)).add(call); + + // backoff is enabled, add + scheduler backoff = overflow exception. + reset(queue); + cqm.setClientBackoffEnabled(true); + doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call); + try { + cqm.add(call); + fail("didn't fail"); + } catch (Exception ex) { + assertTrue(ex.toString(), ex instanceof CallQueueOverflowException); + } + verify(queue, times(0)).put(call); + verify(queue, times(0)).add(call); } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index c6209d2483f..f3bc625d2a9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -50,7 +50,6 @@ import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MockitoUtil; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +95,8 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -1123,7 +1124,7 @@ public class TestRPC extends TestRpcBase { return null; } })); - verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false)); } try { proxy.sleep(null, newSleepRequest(100)); @@ -1194,7 +1195,7 @@ public class TestRPC extends TestRpcBase { return null; } })); - verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false)); } // Start another sleep RPC call and verify the call is backed off due to // avg response time(3s) exceeds threshold (2s).