HADOOP-14035. Reduce fair call queue backoff's impact on clients. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2017-06-06 12:19:44 -05:00
parent e889c826d7
commit f7e597b56d
6 changed files with 441 additions and 36 deletions

View File

@ -18,8 +18,12 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -28,11 +32,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import com.google.common.annotations.VisibleForTesting;
/**
* Abstracts queue operations for different blocking queues.
*/
public class CallQueueManager<E> {
public class CallQueueManager<E extends Schedulable>
extends AbstractQueue<E> implements BlockingQueue<E> {
public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
// Number of checkpoints for empty queue.
private static final int CHECKPOINT_NUM = 20;
@ -76,6 +84,15 @@ public class CallQueueManager<E> {
maxQueueSize + " scheduler: " + schedulerClass);
}
@VisibleForTesting // only!
CallQueueManager(BlockingQueue<E> queue, RpcScheduler scheduler,
boolean clientBackOffEnabled) {
this.putRef = new AtomicReference<BlockingQueue<E>>(queue);
this.takeRef = new AtomicReference<BlockingQueue<E>>(queue);
this.scheduler = scheduler;
this.clientBackOffEnabled = clientBackOffEnabled;
}
private static <T extends RpcScheduler> T createScheduler(
Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
// Used for custom, configurable scheduler
@ -190,12 +207,40 @@ public class CallQueueManager<E> {
}
/**
* Insert e into the backing queue or block until we can.
* Insert e into the backing queue or block until we can. If client
* backoff is enabled this method behaves like add which throws if
* the queue overflows.
* If we block and the queue changes on us, we will insert while the
* queue is drained.
*/
@Override
public void put(E e) throws InterruptedException {
putRef.get().put(e);
if (!isClientBackoffEnabled()) {
putRef.get().put(e);
} else if (shouldBackOff(e)) {
throwBackoff();
} else {
add(e);
}
}
@Override
public boolean add(E e) {
try {
return putRef.get().add(e);
} catch (CallQueueOverflowException ex) {
// queue provided a custom exception that may control if the client
// should be disconnected.
throw ex;
} catch (IllegalStateException ise) {
throwBackoff();
}
return true;
}
// ideally this behavior should be controllable too.
private void throwBackoff() throws IllegalStateException {
throw CallQueueOverflowException.DISCONNECT;
}
/**
@ -203,14 +248,37 @@ public class CallQueueManager<E> {
* Return true if e is queued.
* Return false if the queue is full.
*/
public boolean offer(E e) throws InterruptedException {
@Override
public boolean offer(E e) {
return putRef.get().offer(e);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return putRef.get().offer(e, timeout, unit);
}
@Override
public E peek() {
return takeRef.get().peek();
}
@Override
public E poll() {
return takeRef.get().poll();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return takeRef.get().poll(timeout, unit);
}
/**
* Retrieve an E from the backing queue or block until we can.
* Guaranteed to return an element from the current queue.
*/
@Override
public E take() throws InterruptedException {
E e = null;
@ -221,10 +289,16 @@ public class CallQueueManager<E> {
return e;
}
@Override
public int size() {
return takeRef.get().size();
}
@Override
public int remainingCapacity() {
return takeRef.get().remainingCapacity();
}
/**
* Read the number of levels from the configuration.
* This will affect the FairCallQueue's overall capacity.
@ -308,4 +382,49 @@ public class CallQueueManager<E> {
private String stringRepr(Object o) {
return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode());
}
@Override
public int drainTo(Collection<? super E> c) {
return takeRef.get().drainTo(c);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return takeRef.get().drainTo(c, maxElements);
}
@Override
public Iterator<E> iterator() {
return takeRef.get().iterator();
}
// exception that mimics the standard ISE thrown by blocking queues but
// embeds a rpc server exception for the client to retry and indicate
// if the client should be disconnected.
@SuppressWarnings("serial")
static class CallQueueOverflowException extends IllegalStateException {
private static String TOO_BUSY = "Server too busy";
static final CallQueueOverflowException KEEPALIVE =
new CallQueueOverflowException(
new RetriableException(TOO_BUSY),
RpcStatusProto.ERROR);
static final CallQueueOverflowException DISCONNECT =
new CallQueueOverflowException(
new RetriableException(TOO_BUSY + " - disconnecting"),
RpcStatusProto.FATAL);
CallQueueOverflowException(final IOException ioe,
final RpcStatusProto status) {
super("Queue full", new RpcServerException(ioe.getMessage(), ioe){
@Override
public RpcStatusProto getRpcStatusProto() {
return status;
}
});
}
@Override
public IOException getCause() {
return (IOException)super.getCause();
}
}
}

View File

@ -36,6 +36,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.metrics2.util.MBeans;
/**
@ -134,45 +135,84 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/* AbstractQueue and BlockingQueue methods */
/**
* Put and offer follow the same pattern:
* Add, put, and offer follow the same pattern:
* 1. Get the assigned priorityLevel from the call by scheduler
* 2. Get the nth sub-queue matching this priorityLevel
* 3. delegate the call to this sub-queue.
*
* But differ in how they handle overflow:
* - Put will move on to the next queue until it lands on the last queue
* - Add will move on to the next queue, throw on last queue overflow
* - Put will move on to the next queue, block on last queue overflow
* - Offer does not attempt other queues on overflow
*/
@Override
public boolean add(E e) {
final int priorityLevel = e.getPriorityLevel();
// try offering to all queues.
if (!offerQueues(priorityLevel, e, true)) {
// only disconnect the lowest priority users that overflow the queue.
throw (priorityLevel == queues.size() - 1)
? CallQueueOverflowException.DISCONNECT
: CallQueueOverflowException.KEEPALIVE;
}
return true;
}
@Override
public void put(E e) throws InterruptedException {
int priorityLevel = e.getPriorityLevel();
final int numLevels = this.queues.size();
while (true) {
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean res = q.offer(e);
if (!res) {
// Update stats
this.overflowedCalls.get(priorityLevel).getAndIncrement();
// If we failed to insert, try again on the next level
priorityLevel++;
if (priorityLevel == numLevels) {
// That was the last one, we will block on put in the last queue
// Delete this line to drop the call
this.queues.get(priorityLevel-1).put(e);
break;
}
} else {
break;
}
final int priorityLevel = e.getPriorityLevel();
// try offering to all but last queue, put on last.
if (!offerQueues(priorityLevel, e, false)) {
putQueue(queues.size() - 1, e);
}
}
/**
* Put the element in a queue of a specific priority.
* @param priority - queue priority
* @param e - element to add
*/
@VisibleForTesting
void putQueue(int priority, E e) throws InterruptedException {
queues.get(priority).put(e);
signalNotEmpty();
}
/**
* Offer the element to queue of a specific priority.
* @param priority - queue priority
* @param e - element to add
* @return boolean if added to the given queue
*/
@VisibleForTesting
boolean offerQueue(int priority, E e) {
boolean ret = queues.get(priority).offer(e);
if (ret) {
signalNotEmpty();
}
return ret;
}
/**
* Offer the element to queue of the given or lower priority.
* @param priority - starting queue priority
* @param e - element to add
* @param includeLast - whether to attempt last queue
* @return boolean if added to a queue
*/
private boolean offerQueues(int priority, E e, boolean includeLast) {
int lastPriority = queues.size() - (includeLast ? 1 : 2);
for (int i=priority; i <= lastPriority; i++) {
if (offerQueue(i, e)) {
return true;
}
// Update stats
overflowedCalls.get(i).getAndIncrement();
}
return false;
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@ -2406,7 +2407,9 @@ public abstract class Server {
call.setPriorityLevel(callQueue.getPriorityLevel(call));
try {
queueCall(call);
internalQueueCall(call);
} catch (RpcServerException rse) {
throw rse;
} catch (IOException ioe) {
throw new FatalRpcServerException(
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
@ -2544,9 +2547,19 @@ public abstract class Server {
}
public void queueCall(Call call) throws IOException, InterruptedException {
if (!callQueue.isClientBackoffEnabled()) {
// external non-rpc calls don't need server exception wrapper.
try {
internalQueueCall(call);
} catch (RpcServerException rse) {
throw (IOException)rse.getCause();
}
}
private void internalQueueCall(Call call)
throws IOException, InterruptedException {
try {
callQueue.put(call); // queue the call; maybe blocked here
} else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
} catch (CallQueueOverflowException cqe) {
// If rpc scheduler indicates back off based on performance degradation
// such as response time or rpc queue is full, we will ask the client
// to back off by throwing RetriableException. Whether the client will
@ -2554,7 +2567,8 @@ public abstract class Server {
// For example, IPC clients using FailoverOnNetworkExceptionRetry handle
// RetriableException.
rpcMetrics.incrClientBackoff();
throw new RetriableException("Server is too busy.");
// unwrap retriable exception.
throw cqe.getCause();
}
}

View File

@ -19,8 +19,14 @@
package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.HashMap;
@ -29,8 +35,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
import org.mockito.Mockito;
public class TestCallQueueManager {
private CallQueueManager<FakeCall> manager;
@ -311,11 +319,21 @@ public class TestCallQueueManager {
assertEquals(totalCallsConsumed, totalCallsCreated);
}
public static class ExceptionFakeCall {
public static class ExceptionFakeCall implements Schedulable {
public ExceptionFakeCall() {
throw new IllegalArgumentException("Exception caused by call queue " +
"constructor.!!");
}
@Override
public UserGroupInformation getUserGroupInformation() {
return null;
}
@Override
public int getPriorityLevel() {
return 0;
}
}
public static class ExceptionFakeScheduler {
@ -359,4 +377,62 @@ public class TestCallQueueManager {
.getMessage());
}
}
@SuppressWarnings("unchecked")
@Test
public void testCallQueueOverflowExceptions() throws Exception {
RpcScheduler scheduler = Mockito.mock(RpcScheduler.class);
BlockingQueue<Schedulable> queue = Mockito.mock(BlockingQueue.class);
CallQueueManager<Schedulable> cqm =
Mockito.spy(new CallQueueManager<>(queue, scheduler, false));
Schedulable call = new FakeCall(0);
// call queue exceptions passed threw as-is
doThrow(CallQueueOverflowException.KEEPALIVE).when(queue).add(call);
try {
cqm.add(call);
fail("didn't throw");
} catch (CallQueueOverflowException cqe) {
assertSame(CallQueueOverflowException.KEEPALIVE, cqe);
}
// standard exception for blocking queue full converted to overflow
// exception.
doThrow(new IllegalStateException()).when(queue).add(call);
try {
cqm.add(call);
fail("didn't throw");
} catch (Exception ex) {
assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
}
// backoff disabled, put is put to queue.
reset(queue);
cqm.setClientBackoffEnabled(false);
cqm.put(call);
verify(queue, times(1)).put(call);
verify(queue, times(0)).add(call);
// backoff enabled, put is add to queue.
reset(queue);
cqm.setClientBackoffEnabled(true);
doReturn(Boolean.FALSE).when(cqm).shouldBackOff(call);
cqm.put(call);
verify(queue, times(0)).put(call);
verify(queue, times(1)).add(call);
reset(queue);
// backoff is enabled, put + scheduler backoff = overflow exception.
reset(queue);
cqm.setClientBackoffEnabled(true);
doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call);
try {
cqm.put(call);
fail("didn't fail");
} catch (Exception ex) {
assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
}
verify(queue, times(0)).put(call);
verify(queue, times(0)).add(call);
}
}

View File

@ -18,13 +18,19 @@
package org.apache.hadoop.ipc;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import junit.framework.TestCase;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -34,7 +40,10 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
public class TestFairCallQueue extends TestCase {
private FairCallQueue<Schedulable> fcq;
@ -133,6 +142,153 @@ public class TestFairCallQueue extends TestCase {
assertNull(fcq.poll());
}
@SuppressWarnings("unchecked") // for mock reset.
@Test
public void testInsertion() throws Exception {
Configuration conf = new Configuration();
// 3 queues, 2 slots each.
fcq = Mockito.spy(new FairCallQueue<Schedulable>(3, 6, "ns", conf));
Schedulable p0 = mockCall("a", 0);
Schedulable p1 = mockCall("b", 1);
Schedulable p2 = mockCall("c", 2);
// add to first queue.
Mockito.reset(fcq);
fcq.add(p0);
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(0)).offerQueue(1, p0);
Mockito.verify(fcq, times(0)).offerQueue(2, p0);
Mockito.reset(fcq);
// 0:x- 1:-- 2:--
// add to second queue.
Mockito.reset(fcq);
fcq.add(p1);
Mockito.verify(fcq, times(0)).offerQueue(0, p1);
Mockito.verify(fcq, times(1)).offerQueue(1, p1);
Mockito.verify(fcq, times(0)).offerQueue(2, p1);
// 0:x- 1:x- 2:--
// add to first queue.
Mockito.reset(fcq);
fcq.add(p0);
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(0)).offerQueue(1, p0);
Mockito.verify(fcq, times(0)).offerQueue(2, p0);
// 0:xx 1:x- 2:--
// add to first full queue spills over to second.
Mockito.reset(fcq);
fcq.add(p0);
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(1)).offerQueue(1, p0);
Mockito.verify(fcq, times(0)).offerQueue(2, p0);
// 0:xx 1:xx 2:--
// add to second full queue spills over to third.
Mockito.reset(fcq);
fcq.add(p1);
Mockito.verify(fcq, times(0)).offerQueue(0, p1);
Mockito.verify(fcq, times(1)).offerQueue(1, p1);
Mockito.verify(fcq, times(1)).offerQueue(2, p1);
// 0:xx 1:xx 2:x-
// add to first and second full queue spills over to third.
Mockito.reset(fcq);
fcq.add(p0);
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(1)).offerQueue(1, p0);
Mockito.verify(fcq, times(1)).offerQueue(2, p0);
// 0:xx 1:xx 2:xx
// adding non-lowest priority with all queues full throws a
// non-disconnecting rpc server exception.
Mockito.reset(fcq);
try {
fcq.add(p0);
fail("didn't fail");
} catch (IllegalStateException ise) {
checkOverflowException(ise, RpcStatusProto.ERROR);
}
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(1)).offerQueue(1, p0);
Mockito.verify(fcq, times(1)).offerQueue(2, p0);
// adding non-lowest priority with all queues full throws a
// non-disconnecting rpc server exception.
Mockito.reset(fcq);
try {
fcq.add(p1);
fail("didn't fail");
} catch (IllegalStateException ise) {
checkOverflowException(ise, RpcStatusProto.ERROR);
}
Mockito.verify(fcq, times(0)).offerQueue(0, p1);
Mockito.verify(fcq, times(1)).offerQueue(1, p1);
Mockito.verify(fcq, times(1)).offerQueue(2, p1);
// adding lowest priority with all queues full throws a
// fatal disconnecting rpc server exception.
Mockito.reset(fcq);
try {
fcq.add(p2);
fail("didn't fail");
} catch (IllegalStateException ise) {
checkOverflowException(ise, RpcStatusProto.FATAL);
}
Mockito.verify(fcq, times(0)).offerQueue(0, p2);
Mockito.verify(fcq, times(0)).offerQueue(1, p2);
Mockito.verify(fcq, times(1)).offerQueue(2, p2);
Mockito.reset(fcq);
// used to abort what would be a blocking operation.
Exception stopPuts = new RuntimeException();
// put should offer to all but last subqueue, only put to last subqueue.
Mockito.reset(fcq);
try {
doThrow(stopPuts).when(fcq).putQueue(anyInt(), any(Schedulable.class));
fcq.put(p0);
fail("didn't fail");
} catch (Exception e) {
assertSame(stopPuts, e);
}
Mockito.verify(fcq, times(1)).offerQueue(0, p0);
Mockito.verify(fcq, times(1)).offerQueue(1, p0);
Mockito.verify(fcq, times(0)).offerQueue(2, p0); // expect put, not offer.
Mockito.verify(fcq, times(1)).putQueue(2, p0);
// put with lowest priority should not offer, just put.
Mockito.reset(fcq);
try {
doThrow(stopPuts).when(fcq).putQueue(anyInt(), any(Schedulable.class));
fcq.put(p2);
fail("didn't fail");
} catch (Exception e) {
assertSame(stopPuts, e);
}
Mockito.verify(fcq, times(0)).offerQueue(0, p2);
Mockito.verify(fcq, times(0)).offerQueue(1, p2);
Mockito.verify(fcq, times(0)).offerQueue(2, p2);
Mockito.verify(fcq, times(1)).putQueue(2, p2);
}
private void checkOverflowException(Exception ex, RpcStatusProto status) {
// should be an overflow exception
assertTrue(ex.getClass().getName() + " != CallQueueOverflowException",
ex instanceof CallQueueOverflowException);
IOException ioe = ((CallQueueOverflowException)ex).getCause();
assertNotNull(ioe);
assertTrue(ioe.getClass().getName() + " != RpcServerException",
ioe instanceof RpcServerException);
RpcServerException rse = (RpcServerException)ioe;
// check error/fatal status and if it embeds a retriable ex.
assertEquals(status, rse.getRpcStatusProto());
assertTrue(rse.getClass().getName() + " != RetriableException",
rse.getCause() instanceof RetriableException);
}
//
// Ensure that FairCallQueue properly implements BlockingQueue
//

View File

@ -1135,7 +1135,7 @@ public class TestRPC extends TestRpcBase {
return null;
}
}));
verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject());
verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
}
try {
proxy.sleep(null, newSleepRequest(100));
@ -1206,7 +1206,7 @@ public class TestRPC extends TestRpcBase {
return null;
}
}));
verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject());
verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
}
// Start another sleep RPC call and verify the call is backed off due to
// avg response time(3s) exceeds threshold (2s).