HADOOP-10597. RPC Server signals backoff to clients when all request queues are full. (Contributed by Ming Ma)
This commit is contained in:
parent
1cd2fcf25d
commit
edbeefdb05
|
@ -58,6 +58,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-11827. Speed-up distcp buildListing() using threadpool
|
HADOOP-11827. Speed-up distcp buildListing() using threadpool
|
||||||
(Zoran Dimitrijevic via raviprak)
|
(Zoran Dimitrijevic via raviprak)
|
||||||
|
|
||||||
|
HADOOP-10597. RPC Server signals backoff to clients when all request
|
||||||
|
queues are full. (Ming Ma via Arpit Agarwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||||
|
|
|
@ -90,6 +90,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||||
public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
|
public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
|
||||||
public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
|
public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
|
||||||
public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
|
public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
|
||||||
|
public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
|
||||||
|
public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
|
||||||
|
|
||||||
/** This is for specifying the implementation for the mappings from
|
/** This is for specifying the implementation for the mappings from
|
||||||
* hostnames to the racks they belong to
|
* hostnames to the racks they belong to
|
||||||
|
|
|
@ -38,6 +38,7 @@ public class CallQueueManager<E> {
|
||||||
Class<?> queneClass, Class<E> elementClass) {
|
Class<?> queneClass, Class<E> elementClass) {
|
||||||
return (Class<? extends BlockingQueue<E>>)queneClass;
|
return (Class<? extends BlockingQueue<E>>)queneClass;
|
||||||
}
|
}
|
||||||
|
private final boolean clientBackOffEnabled;
|
||||||
|
|
||||||
// Atomic refs point to active callQueue
|
// Atomic refs point to active callQueue
|
||||||
// We have two so we can better control swapping
|
// We have two so we can better control swapping
|
||||||
|
@ -45,9 +46,11 @@ public class CallQueueManager<E> {
|
||||||
private final AtomicReference<BlockingQueue<E>> takeRef;
|
private final AtomicReference<BlockingQueue<E>> takeRef;
|
||||||
|
|
||||||
public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
|
public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
|
||||||
int maxQueueSize, String namespace, Configuration conf) {
|
boolean clientBackOffEnabled, int maxQueueSize, String namespace,
|
||||||
|
Configuration conf) {
|
||||||
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
|
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
|
||||||
maxQueueSize, namespace, conf);
|
maxQueueSize, namespace, conf);
|
||||||
|
this.clientBackOffEnabled = clientBackOffEnabled;
|
||||||
this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
|
this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
|
||||||
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
|
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
|
||||||
LOG.info("Using callQueue " + backingClass);
|
LOG.info("Using callQueue " + backingClass);
|
||||||
|
@ -89,6 +92,10 @@ public class CallQueueManager<E> {
|
||||||
" could not be constructed.");
|
" could not be constructed.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isClientBackoffEnabled() {
|
||||||
|
return clientBackOffEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Insert e into the backing queue or block until we can.
|
* Insert e into the backing queue or block until we can.
|
||||||
* If we block and the queue changes on us, we will insert while the
|
* If we block and the queue changes on us, we will insert while the
|
||||||
|
@ -98,6 +105,15 @@ public class CallQueueManager<E> {
|
||||||
putRef.get().put(e);
|
putRef.get().put(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insert e into the backing queue.
|
||||||
|
* Return true if e is queued.
|
||||||
|
* Return false if the queue is full.
|
||||||
|
*/
|
||||||
|
public boolean offer(E e) throws InterruptedException {
|
||||||
|
return putRef.get().offer(e);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve an E from the backing queue or block until we can.
|
* Retrieve an E from the backing queue or block until we can.
|
||||||
* Guaranteed to return an element from the current queue.
|
* Guaranteed to return an element from the current queue.
|
||||||
|
|
|
@ -500,6 +500,17 @@ public abstract class Server {
|
||||||
callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
|
callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get from config if client backoff is enabled on that port.
|
||||||
|
*/
|
||||||
|
static boolean getClientBackoffEnable(
|
||||||
|
String prefix, Configuration conf) {
|
||||||
|
String name = prefix + "." +
|
||||||
|
CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
|
||||||
|
return conf.getBoolean(name,
|
||||||
|
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
/** A call queued for handling. */
|
/** A call queued for handling. */
|
||||||
public static class Call implements Schedulable {
|
public static class Call implements Schedulable {
|
||||||
private final int callId; // the client's call id
|
private final int callId; // the client's call id
|
||||||
|
@ -1877,10 +1888,31 @@ public abstract class Server {
|
||||||
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
|
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
|
||||||
header.getClientId().toByteArray(), traceSpan);
|
header.getClientId().toByteArray(), traceSpan);
|
||||||
|
|
||||||
callQueue.put(call); // queue the call; maybe blocked here
|
if (callQueue.isClientBackoffEnabled()) {
|
||||||
|
// if RPC queue is full, we will ask the RPC client to back off by
|
||||||
|
// throwing RetriableException. Whether RPC client will honor
|
||||||
|
// RetriableException and retry depends on client ipc retry policy.
|
||||||
|
// For example, FailoverOnNetworkExceptionRetry handles
|
||||||
|
// RetriableException.
|
||||||
|
queueRequestOrAskClientToBackOff(call);
|
||||||
|
} else {
|
||||||
|
callQueue.put(call); // queue the call; maybe blocked here
|
||||||
|
}
|
||||||
incRpcCount(); // Increment the rpc count
|
incRpcCount(); // Increment the rpc count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void queueRequestOrAskClientToBackOff(Call call)
|
||||||
|
throws WrappedRpcServerException, InterruptedException {
|
||||||
|
// If rpc queue is full, we will ask the client to back off.
|
||||||
|
boolean isCallQueued = callQueue.offer(call);
|
||||||
|
if (!isCallQueued) {
|
||||||
|
rpcMetrics.incrClientBackoff();
|
||||||
|
RetriableException retriableException =
|
||||||
|
new RetriableException("Server is too busy.");
|
||||||
|
throw new WrappedRpcServerException(
|
||||||
|
RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Establish RPC connection setup by negotiating SASL if required, then
|
* Establish RPC connection setup by negotiating SASL if required, then
|
||||||
|
@ -2207,7 +2239,7 @@ public abstract class Server {
|
||||||
// Setup appropriate callqueue
|
// Setup appropriate callqueue
|
||||||
final String prefix = getQueueClassPrefix();
|
final String prefix = getQueueClassPrefix();
|
||||||
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
|
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
|
||||||
maxQueueSize, prefix, conf);
|
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
|
||||||
|
|
||||||
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
|
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
|
||||||
this.authorize =
|
this.authorize =
|
||||||
|
|
|
@ -95,6 +95,8 @@ public class RpcMetrics {
|
||||||
MutableCounterLong rpcAuthorizationFailures;
|
MutableCounterLong rpcAuthorizationFailures;
|
||||||
@Metric("Number of authorization sucesses")
|
@Metric("Number of authorization sucesses")
|
||||||
MutableCounterLong rpcAuthorizationSuccesses;
|
MutableCounterLong rpcAuthorizationSuccesses;
|
||||||
|
@Metric("Number of client backoff requests")
|
||||||
|
MutableCounterLong rpcClientBackoff;
|
||||||
|
|
||||||
@Metric("Number of open connections") public int numOpenConnections() {
|
@Metric("Number of open connections") public int numOpenConnections() {
|
||||||
return server.getNumOpenConnections();
|
return server.getNumOpenConnections();
|
||||||
|
@ -192,4 +194,12 @@ public class RpcMetrics {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* One client backoff event
|
||||||
|
*/
|
||||||
|
//@Override
|
||||||
|
public void incrClientBackoff() {
|
||||||
|
rpcClientBackoff.incr();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,21 +143,21 @@ public class TestCallQueueManager {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCallQueueCapacity() throws InterruptedException {
|
public void testCallQueueCapacity() throws InterruptedException {
|
||||||
manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
|
manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
|
||||||
|
|
||||||
assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
|
assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyConsume() throws InterruptedException {
|
public void testEmptyConsume() throws InterruptedException {
|
||||||
manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
|
manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
|
||||||
|
|
||||||
assertCanTake(manager, 0, 1); // Fails since it's empty
|
assertCanTake(manager, 0, 1); // Fails since it's empty
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testSwapUnderContention() throws InterruptedException {
|
public void testSwapUnderContention() throws InterruptedException {
|
||||||
manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null);
|
manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", null);
|
||||||
|
|
||||||
ArrayList<Putter> producers = new ArrayList<Putter>();
|
ArrayList<Putter> producers = new ArrayList<Putter>();
|
||||||
ArrayList<Taker> consumers = new ArrayList<Taker>();
|
ArrayList<Taker> consumers = new ArrayList<Taker>();
|
||||||
|
|
|
@ -1077,6 +1077,64 @@ public class TestRPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test RPC backoff.
|
||||||
|
*/
|
||||||
|
@Test (timeout=30000)
|
||||||
|
public void testClientBackOff() throws Exception {
|
||||||
|
boolean succeeded = false;
|
||||||
|
final int numClients = 2;
|
||||||
|
final List<Future<Void>> res = new ArrayList<Future<Void>>();
|
||||||
|
final ExecutorService executorService =
|
||||||
|
Executors.newFixedThreadPool(numClients);
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||||
|
conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE +
|
||||||
|
".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
|
||||||
|
final Server server = new RPC.Builder(conf)
|
||||||
|
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
||||||
|
.build();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final TestProtocol proxy =
|
||||||
|
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
||||||
|
NetUtils.getConnectAddress(server), conf);
|
||||||
|
try {
|
||||||
|
// start a sleep RPC call to consume the only handler thread.
|
||||||
|
// Start another sleep RPC call to make callQueue full.
|
||||||
|
// Start another sleep RPC call to make reader thread block on CallQueue.
|
||||||
|
for (int i = 0; i < numClients; i++) {
|
||||||
|
res.add(executorService.submit(
|
||||||
|
new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException, InterruptedException {
|
||||||
|
proxy.sleep(100000);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
while (server.getCallQueueLen() != 1
|
||||||
|
&& countThreads(CallQueueManager.class.getName()) != 1) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
proxy.sleep(100);
|
||||||
|
} catch (RemoteException e) {
|
||||||
|
IOException unwrapExeption = e.unwrapRemoteException();
|
||||||
|
if (unwrapExeption instanceof RetriableException) {
|
||||||
|
succeeded = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
server.stop();
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
assertTrue("RetriableException not received", succeeded);
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
new TestRPC().testCallsInternal(conf);
|
new TestRPC().testCallsInternal(conf);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue