From 0de8b55a095ada2b98c0a41899651bd8e524f42e Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Thu, 14 Mar 2019 10:12:58 -0700 Subject: [PATCH] HADOOP-16192. Fix CallQueue backoff bugs: perform backoff when add() is used and update backoff when refreshed. (cherry-picked from 8c95cb9d6bef369fef6a8364f0c0764eba90e44a) --- .../org/apache/hadoop/ipc/CallQueueManager.java | 11 ++++++++++- .../src/main/java/org/apache/hadoop/ipc/Server.java | 1 + .../org/apache/hadoop/ipc/TestCallQueueManager.java | 13 +++++++++++++ .../test/java/org/apache/hadoop/ipc/TestRPC.java | 7 ++++--- 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 29649a6b6ff..9731e13d86b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -222,12 +222,21 @@ public class CallQueueManager } else if (shouldBackOff(e)) { throwBackoff(); } else { - add(e); + // No need to re-check backoff criteria since they were just checked + addInternal(e, false); } } @Override public boolean add(E e) { + return addInternal(e, true); + } + + @VisibleForTesting + boolean addInternal(E e, boolean checkBackoff) { + if (checkBackoff && isClientBackoffEnabled() && shouldBackOff(e)) { + throwBackoff(); + } try { return putRef.get().add(e); } catch (CallQueueOverflowException ex) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 76d9c400b7a..9285b292464 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -666,6 +666,7 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); callQueue.swapQueue(getSchedulerClass(prefix, conf), getQueueClass(prefix, conf), maxQueueSize, prefix, conf); + callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf)); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index a5a0b0008cb..2b739966d84 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -434,5 +434,18 @@ public class TestCallQueueManager { } verify(queue, times(0)).put(call); verify(queue, times(0)).add(call); + + // backoff is enabled, add + scheduler backoff = overflow exception. + reset(queue); + cqm.setClientBackoffEnabled(true); + doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call); + try { + cqm.add(call); + fail("didn't fail"); + } catch (Exception ex) { + assertTrue(ex.toString(), ex instanceof CallQueueOverflowException); + } + verify(queue, times(0)).put(call); + verify(queue, times(0)).add(call); } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index c99b40372ee..c6bd3b32a6e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -52,7 +52,6 @@ import org.apache.hadoop.test.Whitebox; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -95,6 +94,8 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -1131,7 +1132,7 @@ public class TestRPC extends TestRpcBase { return null; } })); - verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false)); } try { proxy.sleep(null, newSleepRequest(100)); @@ -1202,7 +1203,7 @@ public class TestRPC extends TestRpcBase { return null; } })); - verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false)); } // Start another sleep RPC call and verify the call is backed off due to // avg response time(3s) exceeds threshold (2s).