HADOOP-9194. RPC Support for QoS. (Junping Du via llu)

Merged from trunk@1461370.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1477388 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Luke Lu 2013-04-29 22:33:39 +00:00
parent 0d10e7a16f
commit 7e462d1423
5 changed files with 123 additions and 16 deletions

View File

@ -6,6 +6,8 @@ Release 2.0.5-beta - UNRELEASED
NEW FEATURES
HADOOP-9194. RPC support for QoS. (Junping Du via llu)
HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)
HADOOP-8415. Add getDouble() and setDouble() in

View File

@ -256,6 +256,7 @@ public class Client {
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
private Token<? extends TokenIdentifier> token;
private int serviceClass;
private SaslRpcClient saslRpcClient;
private Socket socket = null; // connected socket
@ -278,7 +279,7 @@ public class Client {
private final Object sendParamsLock = new Object();
public Connection(ConnectionId remoteId) throws IOException {
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
if (server.isUnresolved()) {
@ -295,6 +296,7 @@ public class Client {
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
this.pingInterval = remoteId.getPingInterval();
this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
}
@ -740,7 +742,9 @@ public class Client {
* +----------------------------------+
* | "hrpc" 4 bytes |
* +----------------------------------+
* | Version (1 bytes) |
* | Version (1 byte) |
* +----------------------------------+
* | Service Class (1 byte) |
* +----------------------------------+
* | Authmethod (1 byte) |
* +----------------------------------+
@ -753,6 +757,7 @@ public class Client {
// Write out the header, version and authentication method
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
out.write(serviceClass);
authMethod.write(out);
Server.IpcSerializationType.PROTOBUF.write(out);
out.flush();
@ -1144,19 +1149,33 @@ public class Client {
/**
* Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
* Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that rpcKind is writable.
*/
public Writable call(Writable param, InetSocketAddress addr,
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, Configuration conf)
int rpcTimeout, Configuration conf)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
}
/**
* Same as {@link #call(Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that specifying serviceClass.
*/
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, int serviceClass, Configuration conf)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
}
/**
* Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
@ -1183,10 +1202,10 @@ public class Client {
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
}
/**
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc respond.
*
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
@ -1196,8 +1215,26 @@ public class Client {
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId) throws InterruptedException, IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc respond.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @param serviceClass - service class for RPC
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass)
throws InterruptedException, IOException {
Call call = new Call(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call);
Connection connection = getConnection(remoteId, call, serviceClass);
try {
connection.sendParam(call); // send the parameter
} catch (RejectedExecutionException e) {
@ -1254,7 +1291,7 @@ public class Client {
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
Call call)
Call call, int serviceClass)
throws IOException, InterruptedException {
if (!running.get()) {
// the client is stopped
@ -1269,7 +1306,7 @@ public class Client {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);
connection = new Connection(remoteId, serviceClass);
connections.put(remoteId, connection);
}
}

View File

@ -73,12 +73,12 @@ import com.google.protobuf.BlockingService;
* the protocol instance is transmitted.
*/
public class RPC {
final static int RPC_SERVICE_CLASS_DEFAULT = 0;
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
private static final short FIRST_INDEX = RPC_BUILTIN.value;
public final short value; //TODO make it private
RpcKind(short val) {

View File

@ -431,6 +431,11 @@ public abstract class Server {
return Arrays.asList(handlers);
}
@VisibleForTesting
List<Connection> getConnections() {
return connectionList;
}
/**
* Refresh the service authorization ACL for the service handled by this server.
*/
@ -1097,6 +1102,7 @@ public abstract class Server {
private ByteBuffer connectionHeaderBuf = null;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
private int serviceClass;
UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth
@ -1308,14 +1314,17 @@ public abstract class Server {
if (!connectionHeaderRead) {
//Every connection is expected to send the header.
if (connectionHeaderBuf == null) {
connectionHeaderBuf = ByteBuffer.allocate(3);
connectionHeaderBuf = ByteBuffer.allocate(4);
}
count = channelRead(channel, connectionHeaderBuf);
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count;
}
int version = connectionHeaderBuf.get(0);
byte[] method = new byte[] {connectionHeaderBuf.get(1)};
// TODO we should add handler for service class later
this.setServiceClass(connectionHeaderBuf.get(1));
byte[] method = new byte[] {connectionHeaderBuf.get(2)};
authMethod = AuthMethod.read(new DataInputStream(
new ByteArrayInputStream(method)));
dataLengthBuffer.flip();
@ -1339,7 +1348,7 @@ public abstract class Server {
}
IpcSerializationType serializationType = IpcSerializationType
.fromByte(connectionHeaderBuf.get(2));
.fromByte(connectionHeaderBuf.get(3));
if (serializationType != IpcSerializationType.PROTOBUF) {
respondUnsupportedSerialization(serializationType);
return -1;
@ -1723,6 +1732,22 @@ public abstract class Server {
return true;
}
/**
* Get service class for connection
* @return the serviceClass
*/
public int getServiceClass() {
return serviceClass;
}
/**
* Set service class for connection
* @param serviceClass the serviceClass to set
*/
public void setServiceClass(int serviceClass) {
this.serviceClass = serviceClass;
}
private synchronized void close() throws IOException {
disposeSasl();
data = null;

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
@ -520,11 +521,53 @@ public class TestIPC {
}
}
/**
* Check service class byte in IPC header is correct on wire.
*/
@Test(timeout=60000)
public void testIpcWithServiceClass() throws Exception {
// start server
Server server = new TestServer(5, false);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
// start client
Client.setConnectTimeout(conf, 10000);
callAndVerify(server, addr, 0, true);
// Service Class is low to -128 as byte on wire.
// -128 shouldn't be casted on wire but -129 should.
callAndVerify(server, addr, -128, true);
callAndVerify(server, addr, -129, false);
// Service Class is up to 127.
// 127 shouldn't be casted on wire but 128 should.
callAndVerify(server, addr, 127, true);
callAndVerify(server, addr, 128, false);
server.stop();
}
/**
* Make a call from a client and verify if header info is changed in server side
*/
private void callAndVerify(Server server, InetSocketAddress addr,
int serviceClass, boolean noChanged) throws Exception{
Client client = new Client(LongWritable.class, conf);
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
Connection connection = server.getConnections().get(0);
int serviceClass2 = connection.getServiceClass();
assertFalse(noChanged ^ serviceClass == serviceClass2);
client.stop();
}
/**
* Check that file descriptors aren't leaked by starting
* and stopping IPC servers.
*/
@Test
@Test(timeout=60000)
public void testSocketLeak() throws Exception {
Assume.assumeTrue(FD_DIR.exists()); // only run on Linux