HADOOP-14035. Reduce fair call queue backoff's impact on clients. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2017-06-06 13:09:54 -05:00
parent d013f4134b
commit a83d7ef605
6 changed files with 445 additions and 36 deletions

View File

@ -18,8 +18,12 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -28,11 +32,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Abstracts queue operations for different blocking queues. * Abstracts queue operations for different blocking queues.
*/ */
public class CallQueueManager<E> { public class CallQueueManager<E extends Schedulable>
extends AbstractQueue<E> implements BlockingQueue<E> {
public static final Log LOG = LogFactory.getLog(CallQueueManager.class); public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
// Number of checkpoints for empty queue. // Number of checkpoints for empty queue.
private static final int CHECKPOINT_NUM = 20; private static final int CHECKPOINT_NUM = 20;
@ -51,7 +59,7 @@ public class CallQueueManager<E> {
return (Class<? extends RpcScheduler>)schedulerClass; return (Class<? extends RpcScheduler>)schedulerClass;
} }
private final boolean clientBackOffEnabled; private volatile boolean clientBackOffEnabled;
// Atomic refs point to active callQueue // Atomic refs point to active callQueue
// We have two so we can better control swapping // We have two so we can better control swapping
@ -76,6 +84,15 @@ public class CallQueueManager<E> {
maxQueueSize + " scheduler: " + schedulerClass); maxQueueSize + " scheduler: " + schedulerClass);
} }
@VisibleForTesting // only!
CallQueueManager(BlockingQueue<E> queue, RpcScheduler scheduler,
boolean clientBackOffEnabled) {
this.putRef = new AtomicReference<BlockingQueue<E>>(queue);
this.takeRef = new AtomicReference<BlockingQueue<E>>(queue);
this.scheduler = scheduler;
this.clientBackOffEnabled = clientBackOffEnabled;
}
private static <T extends RpcScheduler> T createScheduler( private static <T extends RpcScheduler> T createScheduler(
Class<T> theClass, int priorityLevels, String ns, Configuration conf) { Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
// Used for custom, configurable scheduler // Used for custom, configurable scheduler
@ -185,13 +202,45 @@ public class CallQueueManager<E> {
return scheduler.getPriorityLevel(e); return scheduler.getPriorityLevel(e);
} }
void setClientBackoffEnabled(boolean value) {
clientBackOffEnabled = value;
}
/** /**
* Insert e into the backing queue or block until we can. * Insert e into the backing queue or block until we can. If client
* backoff is enabled this method behaves like add which throws if
* the queue overflows.
* If we block and the queue changes on us, we will insert while the * If we block and the queue changes on us, we will insert while the
* queue is drained. * queue is drained.
*/ */
@Override
public void put(E e) throws InterruptedException { public void put(E e) throws InterruptedException {
putRef.get().put(e); if (!isClientBackoffEnabled()) {
putRef.get().put(e);
} else if (shouldBackOff(e)) {
throwBackoff();
} else {
add(e);
}
}
@Override
public boolean add(E e) {
try {
return putRef.get().add(e);
} catch (CallQueueOverflowException ex) {
// queue provided a custom exception that may control if the client
// should be disconnected.
throw ex;
} catch (IllegalStateException ise) {
throwBackoff();
}
return true;
}
// ideally this behavior should be controllable too.
private void throwBackoff() throws IllegalStateException {
throw CallQueueOverflowException.DISCONNECT;
} }
/** /**
@ -199,14 +248,37 @@ public class CallQueueManager<E> {
* Return true if e is queued. * Return true if e is queued.
* Return false if the queue is full. * Return false if the queue is full.
*/ */
public boolean offer(E e) throws InterruptedException { @Override
public boolean offer(E e) {
return putRef.get().offer(e); return putRef.get().offer(e);
} }
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return putRef.get().offer(e, timeout, unit);
}
@Override
public E peek() {
return takeRef.get().peek();
}
@Override
public E poll() {
return takeRef.get().poll();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return takeRef.get().poll(timeout, unit);
}
/** /**
* Retrieve an E from the backing queue or block until we can. * Retrieve an E from the backing queue or block until we can.
* Guaranteed to return an element from the current queue. * Guaranteed to return an element from the current queue.
*/ */
@Override
public E take() throws InterruptedException { public E take() throws InterruptedException {
E e = null; E e = null;
@ -217,10 +289,16 @@ public class CallQueueManager<E> {
return e; return e;
} }
@Override
public int size() { public int size() {
return takeRef.get().size(); return takeRef.get().size();
} }
@Override
public int remainingCapacity() {
return takeRef.get().remainingCapacity();
}
/** /**
* Read the number of levels from the configuration. * Read the number of levels from the configuration.
* This will affect the FairCallQueue's overall capacity. * This will affect the FairCallQueue's overall capacity.
@ -304,4 +382,49 @@ public class CallQueueManager<E> {
private String stringRepr(Object o) { private String stringRepr(Object o) {
return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode()); return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode());
} }
@Override
public int drainTo(Collection<? super E> c) {
return takeRef.get().drainTo(c);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return takeRef.get().drainTo(c, maxElements);
}
@Override
public Iterator<E> iterator() {
return takeRef.get().iterator();
}
// exception that mimics the standard ISE thrown by blocking queues but
// embeds a rpc server exception for the client to retry and indicate
// if the client should be disconnected.
@SuppressWarnings("serial")
static class CallQueueOverflowException extends IllegalStateException {
private static String TOO_BUSY = "Server too busy";
static final CallQueueOverflowException KEEPALIVE =
new CallQueueOverflowException(
new RetriableException(TOO_BUSY),
RpcStatusProto.ERROR);
static final CallQueueOverflowException DISCONNECT =
new CallQueueOverflowException(
new RetriableException(TOO_BUSY + " - disconnecting"),
RpcStatusProto.FATAL);
CallQueueOverflowException(final IOException ioe,
final RpcStatusProto status) {
super("Queue full", new RpcServerException(ioe.getMessage(), ioe){
@Override
public RpcStatusProto getRpcStatusProto() {
return status;
}
});
}
@Override
public IOException getCause() {
return (IOException)super.getCause();
}
}
} }

View File

@ -36,6 +36,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
/** /**
@ -134,45 +135,84 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/* AbstractQueue and BlockingQueue methods */ /* AbstractQueue and BlockingQueue methods */
/** /**
* Put and offer follow the same pattern: * Add, put, and offer follow the same pattern:
* 1. Get the assigned priorityLevel from the call by scheduler * 1. Get the assigned priorityLevel from the call by scheduler
* 2. Get the nth sub-queue matching this priorityLevel * 2. Get the nth sub-queue matching this priorityLevel
* 3. delegate the call to this sub-queue. * 3. delegate the call to this sub-queue.
* *
* But differ in how they handle overflow: * But differ in how they handle overflow:
* - Put will move on to the next queue until it lands on the last queue * - Add will move on to the next queue, throw on last queue overflow
* - Put will move on to the next queue, block on last queue overflow
* - Offer does not attempt other queues on overflow * - Offer does not attempt other queues on overflow
*/ */
@Override
public boolean add(E e) {
final int priorityLevel = e.getPriorityLevel();
// try offering to all queues.
if (!offerQueues(priorityLevel, e, true)) {
// only disconnect the lowest priority users that overflow the queue.
throw (priorityLevel == queues.size() - 1)
? CallQueueOverflowException.DISCONNECT
: CallQueueOverflowException.KEEPALIVE;
}
return true;
}
@Override @Override
public void put(E e) throws InterruptedException { public void put(E e) throws InterruptedException {
int priorityLevel = e.getPriorityLevel(); final int priorityLevel = e.getPriorityLevel();
// try offering to all but last queue, put on last.
final int numLevels = this.queues.size(); if (!offerQueues(priorityLevel, e, false)) {
while (true) { putQueue(queues.size() - 1, e);
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean res = q.offer(e);
if (!res) {
// Update stats
this.overflowedCalls.get(priorityLevel).getAndIncrement();
// If we failed to insert, try again on the next level
priorityLevel++;
if (priorityLevel == numLevels) {
// That was the last one, we will block on put in the last queue
// Delete this line to drop the call
this.queues.get(priorityLevel-1).put(e);
break;
}
} else {
break;
}
} }
}
/**
* Put the element in a queue of a specific priority.
* @param priority - queue priority
* @param e - element to add
*/
@VisibleForTesting
void putQueue(int priority, E e) throws InterruptedException {
queues.get(priority).put(e);
signalNotEmpty(); signalNotEmpty();
} }
/**
* Offer the element to queue of a specific priority.
* @param priority - queue priority
* @param e - element to add
* @return boolean if added to the given queue
*/
@VisibleForTesting
boolean offerQueue(int priority, E e) {
boolean ret = queues.get(priority).offer(e);
if (ret) {
signalNotEmpty();
}
return ret;
}
/**
* Offer the element to queue of the given or lower priority.
* @param priority - starting queue priority
* @param e - element to add
* @param includeLast - whether to attempt last queue
* @return boolean if added to a queue
*/
private boolean offerQueues(int priority, E e, boolean includeLast) {
int lastPriority = queues.size() - (includeLast ? 1 : 2);
for (int i=priority; i <= lastPriority; i++) {
if (offerQueue(i, e)) {
return true;
}
// Update stats
overflowedCalls.get(i).getAndIncrement();
}
return false;
}
@Override @Override
public boolean offer(E e, long timeout, TimeUnit unit) public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException { throws InterruptedException {

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@ -2289,7 +2290,9 @@ public abstract class Server {
call.setPriorityLevel(callQueue.getPriorityLevel(call)); call.setPriorityLevel(callQueue.getPriorityLevel(call));
try { try {
queueCall(call); internalQueueCall(call);
} catch (RpcServerException rse) {
throw rse;
} catch (IOException ioe) { } catch (IOException ioe) {
throw new FatalRpcServerException( throw new FatalRpcServerException(
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe); RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
@ -2427,9 +2430,19 @@ public abstract class Server {
} }
public void queueCall(Call call) throws IOException, InterruptedException { public void queueCall(Call call) throws IOException, InterruptedException {
if (!callQueue.isClientBackoffEnabled()) { // external non-rpc calls don't need server exception wrapper.
try {
internalQueueCall(call);
} catch (RpcServerException rse) {
throw (IOException)rse.getCause();
}
}
private void internalQueueCall(Call call)
throws IOException, InterruptedException {
try {
callQueue.put(call); // queue the call; maybe blocked here callQueue.put(call); // queue the call; maybe blocked here
} else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) { } catch (CallQueueOverflowException cqe) {
// If rpc scheduler indicates back off based on performance degradation // If rpc scheduler indicates back off based on performance degradation
// such as response time or rpc queue is full, we will ask the client // such as response time or rpc queue is full, we will ask the client
// to back off by throwing RetriableException. Whether the client will // to back off by throwing RetriableException. Whether the client will
@ -2437,7 +2450,8 @@ public abstract class Server {
// For example, IPC clients using FailoverOnNetworkExceptionRetry handle // For example, IPC clients using FailoverOnNetworkExceptionRetry handle
// RetriableException. // RetriableException.
rpcMetrics.incrClientBackoff(); rpcMetrics.incrClientBackoff();
throw new RetriableException("Server is too busy."); // unwrap retriable exception.
throw cqe.getCause();
} }
} }

View File

@ -19,8 +19,14 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -29,8 +35,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
public class TestCallQueueManager { public class TestCallQueueManager {
private CallQueueManager<FakeCall> manager; private CallQueueManager<FakeCall> manager;
@ -311,11 +319,21 @@ public class TestCallQueueManager {
assertEquals(totalCallsConsumed, totalCallsCreated); assertEquals(totalCallsConsumed, totalCallsCreated);
} }
public static class ExceptionFakeCall { public static class ExceptionFakeCall implements Schedulable {
public ExceptionFakeCall() { public ExceptionFakeCall() {
throw new IllegalArgumentException("Exception caused by call queue " + throw new IllegalArgumentException("Exception caused by call queue " +
"constructor.!!"); "constructor.!!");
} }
@Override
public UserGroupInformation getUserGroupInformation() {
return null;
}
@Override
public int getPriorityLevel() {
return 0;
}
} }
public static class ExceptionFakeScheduler { public static class ExceptionFakeScheduler {
@ -359,4 +377,62 @@ public class TestCallQueueManager {
.getMessage()); .getMessage());
} }
} }
@SuppressWarnings("unchecked")
@Test
public void testCallQueueOverflowExceptions() throws Exception {
RpcScheduler scheduler = Mockito.mock(RpcScheduler.class);
BlockingQueue<Schedulable> queue = Mockito.mock(BlockingQueue.class);
CallQueueManager<Schedulable> cqm =
Mockito.spy(new CallQueueManager<>(queue, scheduler, false));
Schedulable call = new FakeCall(0);
// call queue exceptions passed threw as-is
doThrow(CallQueueOverflowException.KEEPALIVE).when(queue).add(call);
try {
cqm.add(call);
fail("didn't throw");
} catch (CallQueueOverflowException cqe) {
assertSame(CallQueueOverflowException.KEEPALIVE, cqe);
}
// standard exception for blocking queue full converted to overflow
// exception.
doThrow(new IllegalStateException()).when(queue).add(call);
try {
cqm.add(call);
fail("didn't throw");
} catch (Exception ex) {
assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
}
// backoff disabled, put is put to queue.
reset(queue);
cqm.setClientBackoffEnabled(false);
cqm.put(call);
verify(queue, times(1)).put(call);
verify(queue, times(0)).add(call);
// backoff enabled, put is add to queue.
reset(queue);
cqm.setClientBackoffEnabled(true);
doReturn(Boolean.FALSE).when(cqm).shouldBackOff(call);
cqm.put(call);
verify(queue, times(0)).put(call);
verify(queue, times(1)).add(call);
reset(queue);
// backoff is enabled, put + scheduler backoff = overflow exception.
reset(queue);
cqm.setClientBackoffEnabled(true);
doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call);
try {
cqm.put(call);
fail("didn't fail");
} catch (Exception ex) {
assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
}
verify(queue, times(0)).put(call);
verify(queue, times(0)).add(call);
}
} }

View File

@ -18,13 +18,19 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import junit.framework.TestCase; import junit.framework.TestCase;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -34,7 +40,10 @@ import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
public class TestFairCallQueue extends TestCase { public class TestFairCallQueue extends TestCase {
private FairCallQueue<Schedulable> fcq; private FairCallQueue<Schedulable> fcq;
@ -133,6 +142,153 @@ public class TestFairCallQueue extends TestCase {
assertNull(fcq.poll()); assertNull(fcq.poll());
} }
@SuppressWarnings("unchecked") // for mock reset.
@Test
public void testInsertion() throws Exception {
Configuration conf = new Configuration();
// 3 queues, 2 slots each.
fcq = Mockito.spy(new FairCallQueue<Schedulable>(3, 6, "ns", conf));
Schedulable p0 = mockCall("a", 0);
Schedulable p1 = mockCall("b", 1);
Schedulable p2 = mockCall("c", 2);
// add to first queue.
Mockito.reset(fcq);
fcq.add(p0);
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(0)).offerQueue(1, p0);
Mockito.verify(fcq, times(0)).offerQueue(2, p0);
Mockito.reset(fcq);
// 0:x- 1:-- 2:--
// add to second queue.
Mockito.reset(fcq);
fcq.add(p1);
Mockito.verify(fcq, times(0)).offerQueue(0, p1);
Mockito.verify(fcq, times(1)).offerQueue(1, p1);
Mockito.verify(fcq, times(0)).offerQueue(2, p1);
// 0:x- 1:x- 2:--
// add to first queue.
Mockito.reset(fcq);
fcq.add(p0);
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(0)).offerQueue(1, p0);
Mockito.verify(fcq, times(0)).offerQueue(2, p0);
// 0:xx 1:x- 2:--
// add to first full queue spills over to second.
Mockito.reset(fcq);
fcq.add(p0);
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(1)).offerQueue(1, p0);
Mockito.verify(fcq, times(0)).offerQueue(2, p0);
// 0:xx 1:xx 2:--
// add to second full queue spills over to third.
Mockito.reset(fcq);
fcq.add(p1);
Mockito.verify(fcq, times(0)).offerQueue(0, p1);
Mockito.verify(fcq, times(1)).offerQueue(1, p1);
Mockito.verify(fcq, times(1)).offerQueue(2, p1);
// 0:xx 1:xx 2:x-
// add to first and second full queue spills over to third.
Mockito.reset(fcq);
fcq.add(p0);
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(1)).offerQueue(1, p0);
Mockito.verify(fcq, times(1)).offerQueue(2, p0);
// 0:xx 1:xx 2:xx
// adding non-lowest priority with all queues full throws a
// non-disconnecting rpc server exception.
Mockito.reset(fcq);
try {
fcq.add(p0);
fail("didn't fail");
} catch (IllegalStateException ise) {
checkOverflowException(ise, RpcStatusProto.ERROR);
}
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(1)).offerQueue(1, p0);
Mockito.verify(fcq, times(1)).offerQueue(2, p0);
// adding non-lowest priority with all queues full throws a
// non-disconnecting rpc server exception.
Mockito.reset(fcq);
try {
fcq.add(p1);
fail("didn't fail");
} catch (IllegalStateException ise) {
checkOverflowException(ise, RpcStatusProto.ERROR);
}
Mockito.verify(fcq, times(0)).offerQueue(0, p1);
Mockito.verify(fcq, times(1)).offerQueue(1, p1);
Mockito.verify(fcq, times(1)).offerQueue(2, p1);
// adding lowest priority with all queues full throws a
// fatal disconnecting rpc server exception.
Mockito.reset(fcq);
try {
fcq.add(p2);
fail("didn't fail");
} catch (IllegalStateException ise) {
checkOverflowException(ise, RpcStatusProto.FATAL);
}
Mockito.verify(fcq, times(0)).offerQueue(0, p2);
Mockito.verify(fcq, times(0)).offerQueue(1, p2);
Mockito.verify(fcq, times(1)).offerQueue(2, p2);
Mockito.reset(fcq);
// used to abort what would be a blocking operation.
Exception stopPuts = new RuntimeException();
// put should offer to all but last subqueue, only put to last subqueue.
Mockito.reset(fcq);
try {
doThrow(stopPuts).when(fcq).putQueue(anyInt(), any(Schedulable.class));
fcq.put(p0);
fail("didn't fail");
} catch (Exception e) {
assertSame(stopPuts, e);
}
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(1)).offerQueue(1, p0);
Mockito.verify(fcq, times(0)).offerQueue(2, p0); // expect put, not offer.
Mockito.verify(fcq, times(1)).putQueue(2, p0);
// put with lowest priority should not offer, just put.
Mockito.reset(fcq);
try {
doThrow(stopPuts).when(fcq).putQueue(anyInt(), any(Schedulable.class));
fcq.put(p2);
fail("didn't fail");
} catch (Exception e) {
assertSame(stopPuts, e);
}
Mockito.verify(fcq, times(0)).offerQueue(0, p2);
Mockito.verify(fcq, times(0)).offerQueue(1, p2);
Mockito.verify(fcq, times(0)).offerQueue(2, p2);
Mockito.verify(fcq, times(1)).putQueue(2, p2);
}
private void checkOverflowException(Exception ex, RpcStatusProto status) {
// should be an overflow exception
assertTrue(ex.getClass().getName() + " != CallQueueOverflowException",
ex instanceof CallQueueOverflowException);
IOException ioe = ((CallQueueOverflowException)ex).getCause();
assertNotNull(ioe);
assertTrue(ioe.getClass().getName() + " != RpcServerException",
ioe instanceof RpcServerException);
RpcServerException rse = (RpcServerException)ioe;
// check error/fatal status and if it embeds a retriable ex.
assertEquals(status, rse.getRpcStatusProto());
assertTrue(rse.getClass().getName() + " != RetriableException",
rse.getCause() instanceof RetriableException);
}
// //
// Ensure that FairCallQueue properly implements BlockingQueue // Ensure that FairCallQueue properly implements BlockingQueue
// //

View File

@ -1206,7 +1206,7 @@ public class TestRPC extends TestRpcBase {
return null; return null;
} }
})); }));
verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject()); verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
} }
// Start another sleep RPC call and verify the call is backed off due to // Start another sleep RPC call and verify the call is backed off due to
// avg response time(3s) exceeds threshold (2s). // avg response time(3s) exceeds threshold (2s).