From 5319818487d5c139de06155834deecb18c10b7a1 Mon Sep 17 00:00:00 2001 From: Luke Lu Date: Tue, 26 Mar 2013 23:29:09 +0000 Subject: [PATCH] HADOOP-9194. RPC Support for QoS. (Junping Du via llu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461370 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/ipc/Client.java | 59 +++++++++++++++---- .../main/java/org/apache/hadoop/ipc/RPC.java | 2 +- .../java/org/apache/hadoop/ipc/Server.java | 31 +++++++++- .../java/org/apache/hadoop/ipc/TestIPC.java | 45 +++++++++++++- 5 files changed, 123 insertions(+), 16 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ad2b336fb38..a1ac0440be1 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -17,6 +17,8 @@ Trunk (Unreleased) HADOOP-9380 Add totalLength to rpc response (sanjay Radia) + HADOOP-9194. RPC Support for QoS. (Junping Du via llu) + NEW FEATURES HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 5294aa3b94f..986b1a2c154 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -257,6 +257,7 @@ public class Client { private final ConnectionId remoteId; // connection id private AuthMethod authMethod; // authentication method private Token token; + private int serviceClass; private SaslRpcClient saslRpcClient; private Socket socket = null; // connected socket @@ -279,7 +280,7 @@ public class Client { private final Object sendRpcRequestLock = new Object(); - public Connection(ConnectionId remoteId) throws IOException { + public Connection(ConnectionId remoteId, int serviceClass) throws IOException { this.remoteId = remoteId; this.server = remoteId.getAddress(); if (server.isUnresolved()) { @@ -296,6 +297,7 @@ public class Client { this.tcpNoDelay = remoteId.getTcpNoDelay(); this.doPing = remoteId.getDoPing(); this.pingInterval = remoteId.getPingInterval(); + this.serviceClass = serviceClass; if (LOG.isDebugEnabled()) { LOG.debug("The ping interval is " + this.pingInterval + " ms."); } @@ -747,7 +749,9 @@ public class Client { * +----------------------------------+ * | "hrpc" 4 bytes | * +----------------------------------+ - * | Version (1 bytes) | + * | Version (1 byte) | + * +----------------------------------+ + * | Service Class (1 byte) | * +----------------------------------+ * | Authmethod (1 byte) | * +----------------------------------+ @@ -760,6 +764,7 @@ public class Client { // Write out the header, version and authentication method out.write(Server.HEADER.array()); out.write(Server.CURRENT_VERSION); + out.write(serviceClass); authMethod.write(out); Server.IpcSerializationType.PROTOBUF.write(out); out.flush(); @@ -1179,19 +1184,33 @@ public class Client { /** - * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress, + * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress, * Class, UserGroupInformation, int, Configuration)} * except that rpcKind is writable. */ - public Writable call(Writable param, InetSocketAddress addr, + public Writable call(Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, - int rpcTimeout, Configuration conf) + int rpcTimeout, Configuration conf) throws InterruptedException, IOException { - ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, + ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, conf); return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); } + /** + * Same as {@link #call(Writable, InetSocketAddress, + * Class, UserGroupInformation, int, Configuration)} + * except that specifying serviceClass. + */ + public Writable call(Writable param, InetSocketAddress addr, + Class protocol, UserGroupInformation ticket, + int rpcTimeout, int serviceClass, Configuration conf) + throws InterruptedException, IOException { + ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, + ticket, rpcTimeout, conf); + return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass); + } + /** * Make a call, passing param, to the IPC server running at * address which is servicing the protocol protocol, @@ -1218,10 +1237,10 @@ public class Client { return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); } - /** + /** * Make a call, passing rpcRequest, to the IPC server defined by * remoteId, returning the rpc respond. - * + * * @param rpcKind * @param rpcRequest - contains serialized method and method parameters * @param remoteId - the target rpc server @@ -1231,8 +1250,26 @@ public class Client { */ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId) throws InterruptedException, IOException { + return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT); + } + + /** + * Make a call, passing rpcRequest, to the IPC server defined by + * remoteId, returning the rpc respond. + * + * @param rpcKind + * @param rpcRequest - contains serialized method and method parameters + * @param remoteId - the target rpc server + * @param serviceClass - service class for RPC + * @returns the rpc response + * Throws exceptions if there are network problems or if the remote code + * threw an exception. + */ + public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, + ConnectionId remoteId, int serviceClass) + throws InterruptedException, IOException { Call call = new Call(rpcKind, rpcRequest); - Connection connection = getConnection(remoteId, call); + Connection connection = getConnection(remoteId, call, serviceClass); try { connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { @@ -1289,7 +1326,7 @@ public class Client { /** Get a connection from the pool, or create a new one and add it to the * pool. Connections to a given ConnectionId are reused. */ private Connection getConnection(ConnectionId remoteId, - Call call) + Call call, int serviceClass) throws IOException, InterruptedException { if (!running.get()) { // the client is stopped @@ -1304,7 +1341,7 @@ public class Client { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { - connection = new Connection(remoteId); + connection = new Connection(remoteId, serviceClass); connections.put(remoteId, connection); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 8ee22aa415a..3563e07dbad 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -77,12 +77,12 @@ import com.google.protobuf.BlockingService; @InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" }) @InterfaceStability.Evolving public class RPC { + final static int RPC_SERVICE_CLASS_DEFAULT = 0; public enum RpcKind { RPC_BUILTIN ((short) 1), // Used for built in calls by tests RPC_WRITABLE ((short) 2), // Use WritableRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size - private static final short FIRST_INDEX = RPC_BUILTIN.value; public final short value; //TODO make it private RpcKind(short val) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index c44eb9426d3..f76690c27f5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -438,6 +438,11 @@ public abstract class Server { return Arrays.asList(handlers); } + @VisibleForTesting + List getConnections() { + return connectionList; + } + /** * Refresh the service authorization ACL for the service handled by this server. */ @@ -1104,6 +1109,7 @@ public abstract class Server { private ByteBuffer connectionHeaderBuf = null; private ByteBuffer unwrappedData; private ByteBuffer unwrappedDataLengthBuffer; + private int serviceClass; UserGroupInformation user = null; public UserGroupInformation attemptingUser = null; // user name before auth @@ -1314,14 +1320,17 @@ public abstract class Server { if (!connectionHeaderRead) { //Every connection is expected to send the header. if (connectionHeaderBuf == null) { - connectionHeaderBuf = ByteBuffer.allocate(3); + connectionHeaderBuf = ByteBuffer.allocate(4); } count = channelRead(channel, connectionHeaderBuf); if (count < 0 || connectionHeaderBuf.remaining() > 0) { return count; } int version = connectionHeaderBuf.get(0); - byte[] method = new byte[] {connectionHeaderBuf.get(1)}; + // TODO we should add handler for service class later + this.setServiceClass(connectionHeaderBuf.get(1)); + + byte[] method = new byte[] {connectionHeaderBuf.get(2)}; authMethod = AuthMethod.read(new DataInputStream( new ByteArrayInputStream(method))); dataLengthBuffer.flip(); @@ -1345,7 +1354,7 @@ public abstract class Server { } IpcSerializationType serializationType = IpcSerializationType - .fromByte(connectionHeaderBuf.get(2)); + .fromByte(connectionHeaderBuf.get(3)); if (serializationType != IpcSerializationType.PROTOBUF) { respondUnsupportedSerialization(serializationType); return -1; @@ -1735,6 +1744,22 @@ public abstract class Server { return true; } + /** + * Get service class for connection + * @return the serviceClass + */ + public int getServiceClass() { + return serviceClass; + } + + /** + * Set service class for connection + * @param serviceClass the serviceClass to set + */ + public void setServiceClass(int serviceClass) { + this.serviceClass = serviceClass; + } + private synchronized void close() throws IOException { disposeSasl(); data = null; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 3847bfd0814..24b20f89efc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; @@ -520,11 +521,53 @@ public class TestIPC { } } + /** + * Check service class byte in IPC header is correct on wire. + */ + @Test(timeout=60000) + public void testIpcWithServiceClass() throws Exception { + // start server + Server server = new TestServer(5, false); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + // start client + Client.setConnectTimeout(conf, 10000); + + callAndVerify(server, addr, 0, true); + // Service Class is low to -128 as byte on wire. + // -128 shouldn't be casted on wire but -129 should. + callAndVerify(server, addr, -128, true); + callAndVerify(server, addr, -129, false); + + // Service Class is up to 127. + // 127 shouldn't be casted on wire but 128 should. + callAndVerify(server, addr, 127, true); + callAndVerify(server, addr, 128, false); + + server.stop(); + } + + /** + * Make a call from a client and verify if header info is changed in server side + */ + private void callAndVerify(Server server, InetSocketAddress addr, + int serviceClass, boolean noChanged) throws Exception{ + Client client = new Client(LongWritable.class, conf); + + client.call(new LongWritable(RANDOM.nextLong()), + addr, null, null, MIN_SLEEP_TIME, serviceClass, conf); + Connection connection = server.getConnections().get(0); + int serviceClass2 = connection.getServiceClass(); + assertFalse(noChanged ^ serviceClass == serviceClass2); + client.stop(); + } + /** * Check that file descriptors aren't leaked by starting * and stopping IPC servers. */ - @Test + @Test(timeout=60000) public void testSocketLeak() throws Exception { Assume.assumeTrue(FD_DIR.exists()); // only run on Linux