diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 8b9b442302e..db1425fc78e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -505,6 +505,9 @@ Release 2.8.0 - UNRELEASED HADOOP-11827. Speed-up distcp buildListing() using threadpool (Zoran Dimitrijevic via raviprak) + HADOOP-10597. RPC Server signals backoff to clients when all request + queues are full. (Ming Ma via Arpit Agarwal) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 757549600bd..272146662ce 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -90,6 +90,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String IPC_CALLQUEUE_NAMESPACE = "ipc"; public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl"; public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl"; + public static final String IPC_BACKOFF_ENABLE = "backoff.enable"; + public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false; /** This is for specifying the implementation for the mappings from * hostnames to the racks they belong to diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 27949d00713..1568bd6f9a2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -38,16 +38,19 @@ public class CallQueueManager { Class queneClass, Class elementClass) { return (Class>)queneClass; } - + private final boolean clientBackOffEnabled; + // Atomic refs point to active callQueue // We have two so we can better control swapping private final AtomicReference> putRef; private final AtomicReference> takeRef; public CallQueueManager(Class> backingClass, - int maxQueueSize, String namespace, Configuration conf) { + boolean clientBackOffEnabled, int maxQueueSize, String namespace, + Configuration conf) { BlockingQueue bq = createCallQueueInstance(backingClass, maxQueueSize, namespace, conf); + this.clientBackOffEnabled = clientBackOffEnabled; this.putRef = new AtomicReference>(bq); this.takeRef = new AtomicReference>(bq); LOG.info("Using callQueue " + backingClass); @@ -89,6 +92,10 @@ public class CallQueueManager { " could not be constructed."); } + boolean isClientBackoffEnabled() { + return clientBackOffEnabled; + } + /** * Insert e into the backing queue or block until we can. * If we block and the queue changes on us, we will insert while the @@ -98,6 +105,15 @@ public class CallQueueManager { putRef.get().put(e); } + /** + * Insert e into the backing queue. + * Return true if e is queued. + * Return false if the queue is full. + */ + public boolean offer(E e) throws InterruptedException { + return putRef.get().offer(e); + } + /** * Retrieve an E from the backing queue or block until we can. * Guaranteed to return an element from the current queue. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 9aa362ef022..5f1809a9139 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -503,6 +503,17 @@ public abstract class Server { callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf); } + /** + * Get from config if client backoff is enabled on that port. + */ + static boolean getClientBackoffEnable( + String prefix, Configuration conf) { + String name = prefix + "." + + CommonConfigurationKeys.IPC_BACKOFF_ENABLE; + return conf.getBoolean(name, + CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT); + } + /** A call queued for handling. */ public static class Call implements Schedulable { private final int callId; // the client's call id @@ -1962,10 +1973,31 @@ public abstract class Server { rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), traceSpan); - callQueue.put(call); // queue the call; maybe blocked here + if (callQueue.isClientBackoffEnabled()) { + // if RPC queue is full, we will ask the RPC client to back off by + // throwing RetriableException. Whether RPC client will honor + // RetriableException and retry depends on client ipc retry policy. + // For example, FailoverOnNetworkExceptionRetry handles + // RetriableException. + queueRequestOrAskClientToBackOff(call); + } else { + callQueue.put(call); // queue the call; maybe blocked here + } incRpcCount(); // Increment the rpc count } + private void queueRequestOrAskClientToBackOff(Call call) + throws WrappedRpcServerException, InterruptedException { + // If rpc queue is full, we will ask the client to back off. + boolean isCallQueued = callQueue.offer(call); + if (!isCallQueued) { + rpcMetrics.incrClientBackoff(); + RetriableException retriableException = + new RetriableException("Server is too busy."); + throw new WrappedRpcServerException( + RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException); + } + } /** * Establish RPC connection setup by negotiating SASL if required, then @@ -2293,7 +2325,7 @@ public abstract class Server { // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); this.callQueue = new CallQueueManager(getQueueClass(prefix, conf), - maxQueueSize, prefix, conf); + getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); this.secretManager = (SecretManager) secretManager; this.authorize = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index 5eba44a58f6..e90e51674c8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -95,6 +95,8 @@ public class RpcMetrics { MutableCounterLong rpcAuthorizationFailures; @Metric("Number of authorization sucesses") MutableCounterLong rpcAuthorizationSuccesses; + @Metric("Number of client backoff requests") + MutableCounterLong rpcClientBackoff; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -192,4 +194,12 @@ public class RpcMetrics { } } } + + /** + * One client backoff event + */ + //@Override + public void incrClientBackoff() { + rpcClientBackoff.incr(); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 1b618b1ce8b..6e1838e2915 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -143,21 +143,21 @@ public class TestCallQueueManager { @Test public void testCallQueueCapacity() throws InterruptedException { - manager = new CallQueueManager(queueClass, 10, "", null); + manager = new CallQueueManager(queueClass, false, 10, "", null); assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity } @Test public void testEmptyConsume() throws InterruptedException { - manager = new CallQueueManager(queueClass, 10, "", null); + manager = new CallQueueManager(queueClass, false, 10, "", null); assertCanTake(manager, 0, 1); // Fails since it's empty } @Test(timeout=60000) public void testSwapUnderContention() throws InterruptedException { - manager = new CallQueueManager(queueClass, 5000, "", null); + manager = new CallQueueManager(queueClass, false, 5000, "", null); ArrayList producers = new ArrayList(); ArrayList consumers = new ArrayList(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 2db8522b5b6..f0493957e85 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1081,6 +1081,64 @@ public class TestRPC { } } + /** + * Test RPC backoff. + */ + @Test (timeout=30000) + public void testClientBackOff() throws Exception { + boolean succeeded = false; + final int numClients = 2; + final List> res = new ArrayList>(); + final ExecutorService executorService = + Executors.newFixedThreadPool(numClients); + final Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + + ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); + final Server server = new RPC.Builder(conf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) + .build(); + server.start(); + + final TestProtocol proxy = + RPC.getProxy(TestProtocol.class, TestProtocol.versionID, + NetUtils.getConnectAddress(server), conf); + try { + // start a sleep RPC call to consume the only handler thread. + // Start another sleep RPC call to make callQueue full. + // Start another sleep RPC call to make reader thread block on CallQueue. + for (int i = 0; i < numClients; i++) { + res.add(executorService.submit( + new Callable() { + @Override + public Void call() throws IOException, InterruptedException { + proxy.sleep(100000); + return null; + } + })); + } + while (server.getCallQueueLen() != 1 + && countThreads(CallQueueManager.class.getName()) != 1) { + Thread.sleep(100); + } + try { + proxy.sleep(100); + } catch (RemoteException e) { + IOException unwrapExeption = e.unwrapRemoteException(); + if (unwrapExeption instanceof RetriableException) { + succeeded = true; + } + } + } finally { + server.stop(); + RPC.stopProxy(proxy); + executorService.shutdown(); + } + assertTrue("RetriableException not received", succeeded); + } + public static void main(String[] args) throws IOException { new TestRPC().testCallsInternal(conf);