HADOOP-10597. RPC Server signals backoff to clients when all request queues are full. (Contributed by Ming Ma)

This commit is contained in:
Arpit Agarwal 2015-04-23 09:35:04 -07:00
parent 189a63a719
commit 49f6e3d35e
7 changed files with 128 additions and 7 deletions

View File

@ -505,6 +505,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-11827. Speed-up distcp buildListing() using threadpool HADOOP-11827. Speed-up distcp buildListing() using threadpool
(Zoran Dimitrijevic via raviprak) (Zoran Dimitrijevic via raviprak)
HADOOP-10597. RPC Server signals backoff to clients when all request
queues are full. (Ming Ma via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -90,6 +90,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String IPC_CALLQUEUE_NAMESPACE = "ipc"; public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl"; public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl"; public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
/** This is for specifying the implementation for the mappings from /** This is for specifying the implementation for the mappings from
* hostnames to the racks they belong to * hostnames to the racks they belong to

View File

@ -38,16 +38,19 @@ public class CallQueueManager<E> {
Class<?> queneClass, Class<E> elementClass) { Class<?> queneClass, Class<E> elementClass) {
return (Class<? extends BlockingQueue<E>>)queneClass; return (Class<? extends BlockingQueue<E>>)queneClass;
} }
private final boolean clientBackOffEnabled;
// Atomic refs point to active callQueue // Atomic refs point to active callQueue
// We have two so we can better control swapping // We have two so we can better control swapping
private final AtomicReference<BlockingQueue<E>> putRef; private final AtomicReference<BlockingQueue<E>> putRef;
private final AtomicReference<BlockingQueue<E>> takeRef; private final AtomicReference<BlockingQueue<E>> takeRef;
public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass, public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
int maxQueueSize, String namespace, Configuration conf) { boolean clientBackOffEnabled, int maxQueueSize, String namespace,
Configuration conf) {
BlockingQueue<E> bq = createCallQueueInstance(backingClass, BlockingQueue<E> bq = createCallQueueInstance(backingClass,
maxQueueSize, namespace, conf); maxQueueSize, namespace, conf);
this.clientBackOffEnabled = clientBackOffEnabled;
this.putRef = new AtomicReference<BlockingQueue<E>>(bq); this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq); this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
LOG.info("Using callQueue " + backingClass); LOG.info("Using callQueue " + backingClass);
@ -89,6 +92,10 @@ public class CallQueueManager<E> {
" could not be constructed."); " could not be constructed.");
} }
boolean isClientBackoffEnabled() {
return clientBackOffEnabled;
}
/** /**
* Insert e into the backing queue or block until we can. * Insert e into the backing queue or block until we can.
* If we block and the queue changes on us, we will insert while the * If we block and the queue changes on us, we will insert while the
@ -98,6 +105,15 @@ public class CallQueueManager<E> {
putRef.get().put(e); putRef.get().put(e);
} }
/**
* Insert e into the backing queue.
* Return true if e is queued.
* Return false if the queue is full.
*/
public boolean offer(E e) throws InterruptedException {
return putRef.get().offer(e);
}
/** /**
* Retrieve an E from the backing queue or block until we can. * Retrieve an E from the backing queue or block until we can.
* Guaranteed to return an element from the current queue. * Guaranteed to return an element from the current queue.

View File

@ -503,6 +503,17 @@ public abstract class Server {
callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf); callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
} }
/**
* Get from config if client backoff is enabled on that port.
*/
static boolean getClientBackoffEnable(
String prefix, Configuration conf) {
String name = prefix + "." +
CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
return conf.getBoolean(name,
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
}
/** A call queued for handling. */ /** A call queued for handling. */
public static class Call implements Schedulable { public static class Call implements Schedulable {
private final int callId; // the client's call id private final int callId; // the client's call id
@ -1962,10 +1973,31 @@ public abstract class Server {
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan); header.getClientId().toByteArray(), traceSpan);
callQueue.put(call); // queue the call; maybe blocked here if (callQueue.isClientBackoffEnabled()) {
// if RPC queue is full, we will ask the RPC client to back off by
// throwing RetriableException. Whether RPC client will honor
// RetriableException and retry depends on client ipc retry policy.
// For example, FailoverOnNetworkExceptionRetry handles
// RetriableException.
queueRequestOrAskClientToBackOff(call);
} else {
callQueue.put(call); // queue the call; maybe blocked here
}
incRpcCount(); // Increment the rpc count incRpcCount(); // Increment the rpc count
} }
private void queueRequestOrAskClientToBackOff(Call call)
throws WrappedRpcServerException, InterruptedException {
// If rpc queue is full, we will ask the client to back off.
boolean isCallQueued = callQueue.offer(call);
if (!isCallQueued) {
rpcMetrics.incrClientBackoff();
RetriableException retriableException =
new RetriableException("Server is too busy.");
throw new WrappedRpcServerException(
RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
}
}
/** /**
* Establish RPC connection setup by negotiating SASL if required, then * Establish RPC connection setup by negotiating SASL if required, then
@ -2293,7 +2325,7 @@ public abstract class Server {
// Setup appropriate callqueue // Setup appropriate callqueue
final String prefix = getQueueClassPrefix(); final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf), this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
maxQueueSize, prefix, conf); getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize = this.authorize =

View File

@ -95,6 +95,8 @@ public class RpcMetrics {
MutableCounterLong rpcAuthorizationFailures; MutableCounterLong rpcAuthorizationFailures;
@Metric("Number of authorization sucesses") @Metric("Number of authorization sucesses")
MutableCounterLong rpcAuthorizationSuccesses; MutableCounterLong rpcAuthorizationSuccesses;
@Metric("Number of client backoff requests")
MutableCounterLong rpcClientBackoff;
@Metric("Number of open connections") public int numOpenConnections() { @Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections(); return server.getNumOpenConnections();
@ -192,4 +194,12 @@ public class RpcMetrics {
} }
} }
} }
/**
* One client backoff event
*/
//@Override
public void incrClientBackoff() {
rpcClientBackoff.incr();
}
} }

View File

@ -143,21 +143,21 @@ public class TestCallQueueManager {
@Test @Test
public void testCallQueueCapacity() throws InterruptedException { public void testCallQueueCapacity() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null); manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
} }
@Test @Test
public void testEmptyConsume() throws InterruptedException { public void testEmptyConsume() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null); manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
assertCanTake(manager, 0, 1); // Fails since it's empty assertCanTake(manager, 0, 1); // Fails since it's empty
} }
@Test(timeout=60000) @Test(timeout=60000)
public void testSwapUnderContention() throws InterruptedException { public void testSwapUnderContention() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null); manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", null);
ArrayList<Putter> producers = new ArrayList<Putter>(); ArrayList<Putter> producers = new ArrayList<Putter>();
ArrayList<Taker> consumers = new ArrayList<Taker>(); ArrayList<Taker> consumers = new ArrayList<Taker>();

View File

@ -1081,6 +1081,64 @@ public class TestRPC {
} }
} }
/**
* Test RPC backoff.
*/
@Test (timeout=30000)
public void testClientBackOff() throws Exception {
boolean succeeded = false;
final int numClients = 2;
final List<Future<Void>> res = new ArrayList<Future<Void>>();
final ExecutorService executorService =
Executors.newFixedThreadPool(numClients);
final Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE +
".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
final Server server = new RPC.Builder(conf)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0)
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
.build();
server.start();
final TestProtocol proxy =
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
NetUtils.getConnectAddress(server), conf);
try {
// start a sleep RPC call to consume the only handler thread.
// Start another sleep RPC call to make callQueue full.
// Start another sleep RPC call to make reader thread block on CallQueue.
for (int i = 0; i < numClients; i++) {
res.add(executorService.submit(
new Callable<Void>() {
@Override
public Void call() throws IOException, InterruptedException {
proxy.sleep(100000);
return null;
}
}));
}
while (server.getCallQueueLen() != 1
&& countThreads(CallQueueManager.class.getName()) != 1) {
Thread.sleep(100);
}
try {
proxy.sleep(100);
} catch (RemoteException e) {
IOException unwrapExeption = e.unwrapRemoteException();
if (unwrapExeption instanceof RetriableException) {
succeeded = true;
}
}
} finally {
server.stop();
RPC.stopProxy(proxy);
executorService.shutdown();
}
assertTrue("RetriableException not received", succeeded);
}
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf); new TestRPC().testCallsInternal(conf);