From e6ee3e25971f710225edf3474d48618fc9503f99 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Tue, 1 Nov 2016 22:24:40 -0700 Subject: [PATCH] HADOOP-10300. Allowed deferred sending of call responses. Contributed by Daryn Sharp. --- .../hadoop-common/CHANGES.txt | 3 + .../java/org/apache/hadoop/ipc/Server.java | 76 +++++++-- .../hadoop/ipc/TestIPCServerResponder.java | 161 ++++++++++++++++-- 3 files changed, 212 insertions(+), 28 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index acf7324ce5a..b7e866fc16e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -15,6 +15,9 @@ Release 2.7.4 - UNRELEASED HADOOP-12325. RPC Metrics : Add the ability track and log slow RPCs. (Anu Engineer via xyao) + HADOOP-10300. Allowed deferred sending of call responses. (Daryn Sharp via + yliu) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index defd1961706..6be12d404af 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -575,6 +575,7 @@ public abstract class Server { private long timestamp; // time received when response is null // time served when response is not null private ByteBuffer rpcResponse; // the response for this call + private AtomicInteger responseWaitCount = new AtomicInteger(1); private final RPC.RpcKind rpcKind; private final byte[] clientId; private final Span traceSpan; // the tracing span on the server side @@ -609,10 +610,47 @@ public abstract class Server { + retryCount; } + public void setResponse(Throwable t) throws IOException { + setupResponse(new ByteArrayOutputStream(), this, + RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER, + null, t.getClass().getName(), StringUtils.stringifyException(t)); + } + public void setResponse(ByteBuffer response) { this.rpcResponse = response; } + /** + * Allow a IPC response to be postponed instead of sent immediately + * after the handler returns from the proxy method. The intended use + * case is freeing up the handler thread when the response is known, + * but an expensive pre-condition must be satisfied before it's sent + * to the client. + */ + @InterfaceStability.Unstable + @InterfaceAudience.LimitedPrivate({"HDFS"}) + public void postponeResponse() { + int count = responseWaitCount.incrementAndGet(); + assert count > 0 : "response has already been sent"; + } + + @InterfaceStability.Unstable + @InterfaceAudience.LimitedPrivate({"HDFS"}) + public void sendResponse() throws IOException { + int count = responseWaitCount.decrementAndGet(); + assert count >= 0 : "response has already been sent"; + if (count == 0) { + if (rpcResponse == null) { + // needed by postponed operations to indicate an exception has + // occurred. it's too late to re-encode the response so just + // drop the connection. + connection.close(); + } else { + connection.sendResponse(this); + } + } + } + // For Schedulable @Override public UserGroupInformation getUserGroupInformation() { @@ -1241,10 +1279,6 @@ public abstract class Server { RpcConstants.INVALID_RETRY_COUNT, null, this); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); - private final Call saslCall = new Call(AuthProtocol.SASL.callId, - RpcConstants.INVALID_RETRY_COUNT, null, this); - private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream(); - private boolean sentNegotiate = false; private boolean useWrap = false; @@ -1514,24 +1548,27 @@ public abstract class Server { } return response.build(); } - + private void doSaslReply(Message message) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Sending sasl message "+message); } + final Call saslCall = new Call(AuthProtocol.SASL.callId, + RpcConstants.INVALID_RETRY_COUNT, null, this); + final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream(); setupResponse(saslResponse, saslCall, RpcStatusProto.SUCCESS, null, new RpcResponseWrapper(message), null, null); - responder.doRespond(saslCall); + saslCall.sendResponse(); } - + private void doSaslReply(Exception ioe) throws IOException { setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED, null, ioe.getClass().getName(), ioe.getLocalizedMessage()); - responder.doRespond(authFailedCall); + authFailedCall.sendResponse(); } - + private void disposeSasl() { if (saslServer != null) { try { @@ -1707,7 +1744,7 @@ public abstract class Server { setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH, null, VersionMismatch.class.getName(), errMsg); - responder.doRespond(fakeCall); + fakeCall.sendResponse(); } else if (clientVersion >= 3) { Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null, this); @@ -1715,7 +1752,7 @@ public abstract class Server { setupResponseOldVersionFatal(buffer, fakeCall, null, VersionMismatch.class.getName(), errMsg); - responder.doRespond(fakeCall); + fakeCall.sendResponse(); } else if (clientVersion == 2) { // Hadoop 0.18.3 Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this); @@ -1725,8 +1762,7 @@ public abstract class Server { WritableUtils.writeString(out, VersionMismatch.class.getName()); WritableUtils.writeString(out, errMsg); fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray())); - - responder.doRespond(fakeCall); + fakeCall.sendResponse(); } } @@ -1734,7 +1770,7 @@ public abstract class Server { Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this); fakeCall.setResponse(ByteBuffer.wrap( RECEIVED_HTTP_REQ_RESPONSE.getBytes(Charsets.UTF_8))); - responder.doRespond(fakeCall); + fakeCall.sendResponse(); } /** Reads the connection context following the connection header @@ -1874,7 +1910,7 @@ public abstract class Server { setupResponse(authFailedResponse, call, RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, ioe.getClass().getName(), ioe.getMessage()); - responder.doRespond(call); + call.sendResponse(); throw wrse; } } @@ -2073,6 +2109,10 @@ public abstract class Server { } } + private void sendResponse(Call call) throws IOException { + responder.doRespond(call); + } + /** * Get service class for connection * @return the serviceClass @@ -2208,7 +2248,7 @@ public abstract class Server { + call.toString()); buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); } - responder.doRespond(call); + call.sendResponse(); } } catch (InterruptedException e) { if (running) { // unexpected -- log it @@ -2407,7 +2447,7 @@ public abstract class Server { * @param error error message, if the call failed * @throws IOException */ - private void setupResponse(ByteArrayOutputStream responseBuf, + private static void setupResponse(ByteArrayOutputStream responseBuf, Call call, RpcStatusProto status, RpcErrorCodeProto erCode, Writable rv, String errorClass, String error) throws IOException { @@ -2503,7 +2543,7 @@ public abstract class Server { } - private void wrapWithSasl(ByteArrayOutputStream response, Call call) + private static void wrapWithSasl(ByteArrayOutputStream response, Call call) throws IOException { if (call.connection.saslServer != null) { byte[] token = response.toByteArray(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java index 427c82a2bf1..a3bf995e5d4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java @@ -18,35 +18,43 @@ package org.apache.hadoop.ipc; +import static org.junit.Assert.*; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.Random; - -import junit.framework.TestCase; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.net.NetUtils; +import org.junit.Assert; +import org.junit.Test; /** * This test provokes partial writes in the server, which is * serving multiple clients. */ -public class TestIPCServerResponder extends TestCase { +public class TestIPCServerResponder { public static final Log LOG = LogFactory.getLog(TestIPCServerResponder.class); private static Configuration conf = new Configuration(); - public TestIPCServerResponder(final String name) { - super(name); - } - private static final Random RANDOM = new Random(); private static final String ADDRESS = "0.0.0.0"; @@ -115,21 +123,23 @@ public class TestIPCServerResponder extends TestCase { } } + @Test public void testResponseBuffer() throws IOException, InterruptedException { Server.INITIAL_RESP_BUF_SIZE = 1; conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, 1); - testServerResponder(1, true, 1, 1, 5); + checkServerResponder(1, true, 1, 1, 5); conf = new Configuration(); // reset configuration } + @Test public void testServerResponder() throws IOException, InterruptedException { - testServerResponder(10, true, 1, 10, 200); + checkServerResponder(10, true, 1, 10, 200); } - public void testServerResponder(final int handlerCount, + public void checkServerResponder(final int handlerCount, final boolean handlerSleep, final int clientCount, final int callerCount, @@ -159,4 +169,135 @@ public class TestIPCServerResponder extends TestCase { server.stop(); } + // Test that IPC calls can be marked for a deferred response. + // call 0: immediate + // call 1: immediate + // call 2: delayed with wait for 1 sendResponse, check if blocked + // call 3: immediate, proves handler is freed + // call 4: delayed with wait for 2 sendResponses, check if blocked + // call 2: sendResponse, should return + // call 4: sendResponse, should remain blocked + // call 5: immediate, prove handler is still free + // call 4: sendResponse, expect it to return + @Test(timeout=10000) + public void testDeferResponse() throws IOException, InterruptedException { + final AtomicReference deferredCall = new AtomicReference(); + final AtomicInteger count = new AtomicInteger(); + final Writable wait0 = new IntWritable(0); + final Writable wait1 = new IntWritable(1); + final Writable wait2 = new IntWritable(2); + + // use only 1 handler to prove it's freed after every call + Server server = new Server(ADDRESS, 0, IntWritable.class, 1, conf){ + @Override + public Writable call(RPC.RpcKind rpcKind, String protocol, + Writable waitCount, long receiveTime) throws IOException { + Call call = Server.getCurCall().get(); + int wait = ((IntWritable)waitCount).get(); + while (wait-- > 0) { + call.postponeResponse(); + deferredCall.set(call); + } + return new IntWritable(count.getAndIncrement()); + } + }; + server.start(); + + final InetSocketAddress address = NetUtils.getConnectAddress(server); + final Client client = new Client(IntWritable.class, conf); + Call[] waitingCalls = new Call[2]; + + // calls should return immediately, check the sequence number is + // increasing + assertEquals(0, + ((IntWritable)client.call(wait0, address)).get()); + assertEquals(1, + ((IntWritable)client.call(wait0, address)).get()); + + // do a call in the background that will have a deferred response + final ExecutorService exec = Executors.newCachedThreadPool(); + Future future1 = exec.submit(new Callable() { + @Override + public Integer call() throws IOException { + return ((IntWritable)client.call(wait1, address)).get(); + } + }); + // make sure it blocked + try { + future1.get(1, TimeUnit.SECONDS); + Assert.fail("ipc shouldn't have responded"); + } catch (TimeoutException te) { + // ignore, expected + } catch (Exception ex) { + Assert.fail("unexpected exception:"+ex); + } + assertFalse(future1.isDone()); + waitingCalls[0] = deferredCall.get(); + assertNotNull(waitingCalls[0]); + + // proves the handler isn't tied up, and that the prior sequence number + // was consumed + assertEquals(3, + ((IntWritable)client.call(wait0, address)).get()); + + // another call with wait count of 2 + Future future2 = exec.submit(new Callable() { + @Override + public Integer call() throws IOException { + return ((IntWritable)client.call(wait2, address)).get(); + } + }); + // make sure it blocked + try { + future2.get(1, TimeUnit.SECONDS); + Assert.fail("ipc shouldn't have responded"); + } catch (TimeoutException te) { + // ignore, expected + } catch (Exception ex) { + Assert.fail("unexpected exception:"+ex); + } + assertFalse(future2.isDone()); + waitingCalls[1] = deferredCall.get(); + assertNotNull(waitingCalls[1]); + + // the background calls should still be blocked + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // trigger responses + waitingCalls[0].sendResponse(); + waitingCalls[1].sendResponse(); + try { + int val = future1.get(1, TimeUnit.SECONDS); + assertEquals(2, val); + } catch (Exception ex) { + Assert.fail("unexpected exception:"+ex); + } + + // make sure it's still blocked + try { + future2.get(1, TimeUnit.SECONDS); + Assert.fail("ipc shouldn't have responded"); + } catch (TimeoutException te) { + // ignore, expected + } catch (Exception ex) { + Assert.fail("unexpected exception:"+ex); + } + assertFalse(future2.isDone()); + + // call should return immediately + assertEquals(5, + ((IntWritable)client.call(wait0, address)).get()); + + // trigger last waiting call + waitingCalls[1].sendResponse(); + try { + int val = future2.get(1, TimeUnit.SECONDS); + assertEquals(4, val); + } catch (Exception ex) { + Assert.fail("unexpected exception:"+ex); + } + + server.stop(); + } }