diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 50ed3532556..2764788579a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -18,8 +18,12 @@ package org.apache.hadoop.ipc; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -28,11 +32,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; + +import com.google.common.annotations.VisibleForTesting; /** * Abstracts queue operations for different blocking queues. */ -public class CallQueueManager { +public class CallQueueManager + extends AbstractQueue implements BlockingQueue { public static final Log LOG = LogFactory.getLog(CallQueueManager.class); // Number of checkpoints for empty queue. private static final int CHECKPOINT_NUM = 20; @@ -76,6 +84,15 @@ public CallQueueManager(Class> backingClass, maxQueueSize + " scheduler: " + schedulerClass); } + @VisibleForTesting // only! + CallQueueManager(BlockingQueue queue, RpcScheduler scheduler, + boolean clientBackOffEnabled) { + this.putRef = new AtomicReference>(queue); + this.takeRef = new AtomicReference>(queue); + this.scheduler = scheduler; + this.clientBackOffEnabled = clientBackOffEnabled; + } + private static T createScheduler( Class theClass, int priorityLevels, String ns, Configuration conf) { // Used for custom, configurable scheduler @@ -190,12 +207,40 @@ void setClientBackoffEnabled(boolean value) { } /** - * Insert e into the backing queue or block until we can. + * Insert e into the backing queue or block until we can. If client + * backoff is enabled this method behaves like add which throws if + * the queue overflows. * If we block and the queue changes on us, we will insert while the * queue is drained. */ + @Override public void put(E e) throws InterruptedException { - putRef.get().put(e); + if (!isClientBackoffEnabled()) { + putRef.get().put(e); + } else if (shouldBackOff(e)) { + throwBackoff(); + } else { + add(e); + } + } + + @Override + public boolean add(E e) { + try { + return putRef.get().add(e); + } catch (CallQueueOverflowException ex) { + // queue provided a custom exception that may control if the client + // should be disconnected. + throw ex; + } catch (IllegalStateException ise) { + throwBackoff(); + } + return true; + } + + // ideally this behavior should be controllable too. + private void throwBackoff() throws IllegalStateException { + throw CallQueueOverflowException.DISCONNECT; } /** @@ -203,14 +248,37 @@ public void put(E e) throws InterruptedException { * Return true if e is queued. * Return false if the queue is full. */ - public boolean offer(E e) throws InterruptedException { + @Override + public boolean offer(E e) { return putRef.get().offer(e); } + @Override + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + return putRef.get().offer(e, timeout, unit); + } + + @Override + public E peek() { + return takeRef.get().peek(); + } + + @Override + public E poll() { + return takeRef.get().poll(); + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + return takeRef.get().poll(timeout, unit); + } + /** * Retrieve an E from the backing queue or block until we can. * Guaranteed to return an element from the current queue. */ + @Override public E take() throws InterruptedException { E e = null; @@ -221,10 +289,16 @@ public E take() throws InterruptedException { return e; } + @Override public int size() { return takeRef.get().size(); } + @Override + public int remainingCapacity() { + return takeRef.get().remainingCapacity(); + } + /** * Read the number of levels from the configuration. * This will affect the FairCallQueue's overall capacity. @@ -308,4 +382,49 @@ private boolean queueIsReallyEmpty(BlockingQueue q) { private String stringRepr(Object o) { return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode()); } + + @Override + public int drainTo(Collection c) { + return takeRef.get().drainTo(c); + } + + @Override + public int drainTo(Collection c, int maxElements) { + return takeRef.get().drainTo(c, maxElements); + } + + @Override + public Iterator iterator() { + return takeRef.get().iterator(); + } + + // exception that mimics the standard ISE thrown by blocking queues but + // embeds a rpc server exception for the client to retry and indicate + // if the client should be disconnected. + @SuppressWarnings("serial") + static class CallQueueOverflowException extends IllegalStateException { + private static String TOO_BUSY = "Server too busy"; + static final CallQueueOverflowException KEEPALIVE = + new CallQueueOverflowException( + new RetriableException(TOO_BUSY), + RpcStatusProto.ERROR); + static final CallQueueOverflowException DISCONNECT = + new CallQueueOverflowException( + new RetriableException(TOO_BUSY + " - disconnecting"), + RpcStatusProto.FATAL); + + CallQueueOverflowException(final IOException ioe, + final RpcStatusProto status) { + super("Queue full", new RpcServerException(ioe.getMessage(), ioe){ + @Override + public RpcStatusProto getRpcStatusProto() { + return status; + } + }); + } + @Override + public IOException getCause() { + return (IOException)super.getCause(); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index 820f24cb728..8bcaf059367 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -36,6 +36,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException; import org.apache.hadoop.metrics2.util.MBeans; /** @@ -134,45 +135,84 @@ private E removeNextElement() { /* AbstractQueue and BlockingQueue methods */ /** - * Put and offer follow the same pattern: + * Add, put, and offer follow the same pattern: * 1. Get the assigned priorityLevel from the call by scheduler * 2. Get the nth sub-queue matching this priorityLevel * 3. delegate the call to this sub-queue. * * But differ in how they handle overflow: - * - Put will move on to the next queue until it lands on the last queue + * - Add will move on to the next queue, throw on last queue overflow + * - Put will move on to the next queue, block on last queue overflow * - Offer does not attempt other queues on overflow */ + + @Override + public boolean add(E e) { + final int priorityLevel = e.getPriorityLevel(); + // try offering to all queues. + if (!offerQueues(priorityLevel, e, true)) { + // only disconnect the lowest priority users that overflow the queue. + throw (priorityLevel == queues.size() - 1) + ? CallQueueOverflowException.DISCONNECT + : CallQueueOverflowException.KEEPALIVE; + } + return true; + } + @Override public void put(E e) throws InterruptedException { - int priorityLevel = e.getPriorityLevel(); - - final int numLevels = this.queues.size(); - while (true) { - BlockingQueue q = this.queues.get(priorityLevel); - boolean res = q.offer(e); - if (!res) { - // Update stats - this.overflowedCalls.get(priorityLevel).getAndIncrement(); - - // If we failed to insert, try again on the next level - priorityLevel++; - - if (priorityLevel == numLevels) { - // That was the last one, we will block on put in the last queue - // Delete this line to drop the call - this.queues.get(priorityLevel-1).put(e); - break; - } - } else { - break; - } + final int priorityLevel = e.getPriorityLevel(); + // try offering to all but last queue, put on last. + if (!offerQueues(priorityLevel, e, false)) { + putQueue(queues.size() - 1, e); } + } - + /** + * Put the element in a queue of a specific priority. + * @param priority - queue priority + * @param e - element to add + */ + @VisibleForTesting + void putQueue(int priority, E e) throws InterruptedException { + queues.get(priority).put(e); signalNotEmpty(); } + /** + * Offer the element to queue of a specific priority. + * @param priority - queue priority + * @param e - element to add + * @return boolean if added to the given queue + */ + @VisibleForTesting + boolean offerQueue(int priority, E e) { + boolean ret = queues.get(priority).offer(e); + if (ret) { + signalNotEmpty(); + } + return ret; + } + + /** + * Offer the element to queue of the given or lower priority. + * @param priority - starting queue priority + * @param e - element to add + * @param includeLast - whether to attempt last queue + * @return boolean if added to a queue + */ + private boolean offerQueues(int priority, E e, boolean includeLast) { + int lastPriority = queues.size() - (includeLast ? 1 : 2); + for (int i=priority; i <= lastPriority; i++) { + if (offerQueue(i, e)) { + return true; + } + // Update stats + overflowedCalls.get(i).getAndIncrement(); + } + return false; + } + @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index f3b9a8241f6..df108b80aca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -83,6 +83,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; @@ -2479,7 +2480,9 @@ private void processRpcRequest(RpcRequestHeaderProto header, call.setPriorityLevel(callQueue.getPriorityLevel(call)); try { - queueCall(call); + internalQueueCall(call); + } catch (RpcServerException rse) { + throw rse; } catch (IOException ioe) { throw new FatalRpcServerException( RpcErrorCodeProto.ERROR_RPC_SERVER, ioe); @@ -2616,9 +2619,19 @@ private synchronized void close() { } public void queueCall(Call call) throws IOException, InterruptedException { - if (!callQueue.isClientBackoffEnabled()) { + // external non-rpc calls don't need server exception wrapper. + try { + internalQueueCall(call); + } catch (RpcServerException rse) { + throw (IOException)rse.getCause(); + } + } + + private void internalQueueCall(Call call) + throws IOException, InterruptedException { + try { callQueue.put(call); // queue the call; maybe blocked here - } else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) { + } catch (CallQueueOverflowException cqe) { // If rpc scheduler indicates back off based on performance degradation // such as response time or rpc queue is full, we will ask the client // to back off by throwing RetriableException. Whether the client will @@ -2626,7 +2639,8 @@ public void queueCall(Call call) throws IOException, InterruptedException { // For example, IPC clients using FailoverOnNetworkExceptionRetry handle // RetriableException. rpcMetrics.incrClientBackoff(); - throw new RetriableException("Server is too busy."); + // unwrap retriable exception. + throw cqe.getCause(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 121165785cd..a5a0b0008cb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -19,8 +19,14 @@ package org.apache.hadoop.ipc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.HashMap; @@ -29,8 +35,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; +import org.mockito.Mockito; public class TestCallQueueManager { private CallQueueManager manager; @@ -311,11 +319,21 @@ public void testSwapUnderContention() throws InterruptedException { assertEquals(totalCallsConsumed, totalCallsCreated); } - public static class ExceptionFakeCall { + public static class ExceptionFakeCall implements Schedulable { public ExceptionFakeCall() { throw new IllegalArgumentException("Exception caused by call queue " + "constructor.!!"); } + + @Override + public UserGroupInformation getUserGroupInformation() { + return null; + } + + @Override + public int getPriorityLevel() { + return 0; + } } public static class ExceptionFakeScheduler { @@ -359,4 +377,62 @@ public void testSchedulerConstructorException() throws InterruptedException { .getMessage()); } } + + @SuppressWarnings("unchecked") + @Test + public void testCallQueueOverflowExceptions() throws Exception { + RpcScheduler scheduler = Mockito.mock(RpcScheduler.class); + BlockingQueue queue = Mockito.mock(BlockingQueue.class); + CallQueueManager cqm = + Mockito.spy(new CallQueueManager<>(queue, scheduler, false)); + Schedulable call = new FakeCall(0); + + // call queue exceptions passed threw as-is + doThrow(CallQueueOverflowException.KEEPALIVE).when(queue).add(call); + try { + cqm.add(call); + fail("didn't throw"); + } catch (CallQueueOverflowException cqe) { + assertSame(CallQueueOverflowException.KEEPALIVE, cqe); + } + + // standard exception for blocking queue full converted to overflow + // exception. + doThrow(new IllegalStateException()).when(queue).add(call); + try { + cqm.add(call); + fail("didn't throw"); + } catch (Exception ex) { + assertTrue(ex.toString(), ex instanceof CallQueueOverflowException); + } + + // backoff disabled, put is put to queue. + reset(queue); + cqm.setClientBackoffEnabled(false); + cqm.put(call); + verify(queue, times(1)).put(call); + verify(queue, times(0)).add(call); + + // backoff enabled, put is add to queue. + reset(queue); + cqm.setClientBackoffEnabled(true); + doReturn(Boolean.FALSE).when(cqm).shouldBackOff(call); + cqm.put(call); + verify(queue, times(0)).put(call); + verify(queue, times(1)).add(call); + reset(queue); + + // backoff is enabled, put + scheduler backoff = overflow exception. + reset(queue); + cqm.setClientBackoffEnabled(true); + doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call); + try { + cqm.put(call); + fail("didn't fail"); + } catch (Exception ex) { + assertTrue(ex.toString(), ex instanceof CallQueueOverflowException); + } + verify(queue, times(0)).put(call); + verify(queue, times(0)).add(call); + } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java index 901a7718788..6b1cd294e9c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java @@ -18,13 +18,19 @@ package org.apache.hadoop.ipc; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyObject; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import junit.framework.TestCase; import javax.management.MBeanServer; import javax.management.ObjectName; + +import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -34,7 +40,10 @@ import java.util.concurrent.BlockingQueue; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; public class TestFairCallQueue extends TestCase { private FairCallQueue fcq; @@ -133,6 +142,153 @@ public int getAndAdvanceCurrentIndex() { assertNull(fcq.poll()); } + @SuppressWarnings("unchecked") // for mock reset. + @Test + public void testInsertion() throws Exception { + Configuration conf = new Configuration(); + // 3 queues, 2 slots each. + fcq = Mockito.spy(new FairCallQueue(3, 6, "ns", conf)); + + Schedulable p0 = mockCall("a", 0); + Schedulable p1 = mockCall("b", 1); + Schedulable p2 = mockCall("c", 2); + + // add to first queue. + Mockito.reset(fcq); + fcq.add(p0); + Mockito.verify(fcq, times(1)).offerQueue(0, p0); + Mockito.verify(fcq, times(0)).offerQueue(1, p0); + Mockito.verify(fcq, times(0)).offerQueue(2, p0); + Mockito.reset(fcq); + // 0:x- 1:-- 2:-- + + // add to second queue. + Mockito.reset(fcq); + fcq.add(p1); + Mockito.verify(fcq, times(0)).offerQueue(0, p1); + Mockito.verify(fcq, times(1)).offerQueue(1, p1); + Mockito.verify(fcq, times(0)).offerQueue(2, p1); + // 0:x- 1:x- 2:-- + + // add to first queue. + Mockito.reset(fcq); + fcq.add(p0); + Mockito.verify(fcq, times(1)).offerQueue(0, p0); + Mockito.verify(fcq, times(0)).offerQueue(1, p0); + Mockito.verify(fcq, times(0)).offerQueue(2, p0); + // 0:xx 1:x- 2:-- + + // add to first full queue spills over to second. + Mockito.reset(fcq); + fcq.add(p0); + Mockito.verify(fcq, times(1)).offerQueue(0, p0); + Mockito.verify(fcq, times(1)).offerQueue(1, p0); + Mockito.verify(fcq, times(0)).offerQueue(2, p0); + // 0:xx 1:xx 2:-- + + // add to second full queue spills over to third. + Mockito.reset(fcq); + fcq.add(p1); + Mockito.verify(fcq, times(0)).offerQueue(0, p1); + Mockito.verify(fcq, times(1)).offerQueue(1, p1); + Mockito.verify(fcq, times(1)).offerQueue(2, p1); + // 0:xx 1:xx 2:x- + + // add to first and second full queue spills over to third. + Mockito.reset(fcq); + fcq.add(p0); + Mockito.verify(fcq, times(1)).offerQueue(0, p0); + Mockito.verify(fcq, times(1)).offerQueue(1, p0); + Mockito.verify(fcq, times(1)).offerQueue(2, p0); + // 0:xx 1:xx 2:xx + + // adding non-lowest priority with all queues full throws a + // non-disconnecting rpc server exception. + Mockito.reset(fcq); + try { + fcq.add(p0); + fail("didn't fail"); + } catch (IllegalStateException ise) { + checkOverflowException(ise, RpcStatusProto.ERROR); + } + Mockito.verify(fcq, times(1)).offerQueue(0, p0); + Mockito.verify(fcq, times(1)).offerQueue(1, p0); + Mockito.verify(fcq, times(1)).offerQueue(2, p0); + + // adding non-lowest priority with all queues full throws a + // non-disconnecting rpc server exception. + Mockito.reset(fcq); + try { + fcq.add(p1); + fail("didn't fail"); + } catch (IllegalStateException ise) { + checkOverflowException(ise, RpcStatusProto.ERROR); + } + Mockito.verify(fcq, times(0)).offerQueue(0, p1); + Mockito.verify(fcq, times(1)).offerQueue(1, p1); + Mockito.verify(fcq, times(1)).offerQueue(2, p1); + + // adding lowest priority with all queues full throws a + // fatal disconnecting rpc server exception. + Mockito.reset(fcq); + try { + fcq.add(p2); + fail("didn't fail"); + } catch (IllegalStateException ise) { + checkOverflowException(ise, RpcStatusProto.FATAL); + } + Mockito.verify(fcq, times(0)).offerQueue(0, p2); + Mockito.verify(fcq, times(0)).offerQueue(1, p2); + Mockito.verify(fcq, times(1)).offerQueue(2, p2); + Mockito.reset(fcq); + + // used to abort what would be a blocking operation. + Exception stopPuts = new RuntimeException(); + + // put should offer to all but last subqueue, only put to last subqueue. + Mockito.reset(fcq); + try { + doThrow(stopPuts).when(fcq).putQueue(anyInt(), anyObject()); + fcq.put(p0); + fail("didn't fail"); + } catch (Exception e) { + assertSame(stopPuts, e); + } + Mockito.verify(fcq, times(1)).offerQueue(0, p0); + Mockito.verify(fcq, times(1)).offerQueue(1, p0); + Mockito.verify(fcq, times(0)).offerQueue(2, p0); // expect put, not offer. + Mockito.verify(fcq, times(1)).putQueue(2, p0); + + // put with lowest priority should not offer, just put. + Mockito.reset(fcq); + try { + doThrow(stopPuts).when(fcq).putQueue(anyInt(), anyObject()); + fcq.put(p2); + fail("didn't fail"); + } catch (Exception e) { + assertSame(stopPuts, e); + } + Mockito.verify(fcq, times(0)).offerQueue(0, p2); + Mockito.verify(fcq, times(0)).offerQueue(1, p2); + Mockito.verify(fcq, times(0)).offerQueue(2, p2); + Mockito.verify(fcq, times(1)).putQueue(2, p2); + } + + private void checkOverflowException(Exception ex, RpcStatusProto status) { + // should be an overflow exception + assertTrue(ex.getClass().getName() + " != CallQueueOverflowException", + ex instanceof CallQueueOverflowException); + IOException ioe = ((CallQueueOverflowException)ex).getCause(); + assertNotNull(ioe); + assertTrue(ioe.getClass().getName() + " != RpcServerException", + ioe instanceof RpcServerException); + RpcServerException rse = (RpcServerException)ioe; + // check error/fatal status and if it embeds a retriable ex. + assertEquals(status, rse.getRpcStatusProto()); + assertTrue(rse.getClass().getName() + " != RetriableException", + rse.getCause() instanceof RetriableException); + } + // // Ensure that FairCallQueue properly implements BlockingQueue // diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 3cc99164bde..166b205714e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1123,7 +1123,7 @@ public Void call() throws ServiceException, InterruptedException { return null; } })); - verify(spy, timeout(500).times(i + 1)).offer(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); } try { proxy.sleep(null, newSleepRequest(100)); @@ -1194,7 +1194,7 @@ public Void call() throws ServiceException, InterruptedException { return null; } })); - verify(spy, timeout(500).times(i + 1)).offer(Mockito.anyObject()); + verify(spy, timeout(500).times(i + 1)).add(Mockito.anyObject()); } // Start another sleep RPC call and verify the call is backed off due to // avg response time(3s) exceeds threshold (2s).