From 779d604bfb2fe4bd18b2ba086d3590eea5ea5fc6 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 11 Jan 2012 07:04:06 +0000 Subject: [PATCH] svn merge -c 1197885 from trunk for HADOOP-7776. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1229909 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/ipc/Client.java | 104 ++++++++++----- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 3 +- .../apache/hadoop/ipc/RpcPayloadHeader.java | 118 ++++++++++++++++++ .../java/org/apache/hadoop/ipc/Server.java | 76 ++++++----- .../apache/hadoop/ipc/WritableRpcEngine.java | 3 +- 6 files changed, 241 insertions(+), 65 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 9b41e2690cd..b0f8346ca42 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -21,6 +21,8 @@ Release 0.23-PB - Unreleased HADOOP-7716 RPC protocol registration on SS does not log the protocol name (only the class which may be different) (sanjay) + HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay) + Release 0.23.1 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 0acb8f8af90..d0adcb3fed8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -50,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RpcPayloadHeader.*; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -153,16 +154,20 @@ public class Client { return refCount==0; } - /** A call waiting for a value. */ + /** + * Class that represents an RPC call + */ private class Call { - int id; // call id - Writable param; // parameter - Writable value; // value, null if error - IOException error; // exception, null if value - boolean done; // true when call is done + final int id; // call id + final Writable rpcRequest; // the serialized rpc request - RpcPayload + Writable rpcResponse; // null if rpc has error + IOException error; // exception, null if success + final RpcKind rpcKind; // Rpc EngineKind + boolean done; // true when call is done - protected Call(Writable param) { - this.param = param; + protected Call(RpcKind rpcKind, Writable param) { + this.rpcKind = rpcKind; + this.rpcRequest = param; synchronized (Client.this) { this.id = counter++; } @@ -188,15 +193,15 @@ public class Client { /** Set the return value when there is no error. * Notify the caller the call is done. * - * @param value return value of the call. + * @param rpcResponse return value of the rpc call. */ - public synchronized void setValue(Writable value) { - this.value = value; + public synchronized void setRpcResponse(Writable rpcResponse) { + this.rpcResponse = rpcResponse; callComplete(); } - public synchronized Writable getValue() { - return value; + public synchronized Writable getRpcResult() { + return rpcResponse; } } @@ -728,6 +733,7 @@ public class Client { } } + @SuppressWarnings("unused") public InetSocketAddress getRemoteAddress() { return server; } @@ -788,8 +794,10 @@ public class Client { //for serializing the //data to be written d = new DataOutputBuffer(); - d.writeInt(call.id); - call.param.write(d); + RpcPayloadHeader header = new RpcPayloadHeader( + call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id); + header.write(d); + call.rpcRequest.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //first put the data length @@ -826,7 +834,7 @@ public class Client { if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value - call.setValue(value); + call.setRpcResponse(value); calls.remove(id); } else if (state == Status.ERROR.state) { call.setException(new RemoteException(WritableUtils.readString(in), @@ -910,7 +918,7 @@ public class Client { private int index; public ParallelCall(Writable param, ParallelResults results, int index) { - super(param); + super(RpcKind.RPC_WRITABLE, param); this.results = results; this.index = index; } @@ -934,7 +942,7 @@ public class Client { /** Collect a result. */ public synchronized void callComplete(ParallelCall call) { - values[call.index] = call.getValue(); // store the value + values[call.index] = call.getRpcResult(); // store the value count++; // count it if (count == size) // if all values are in notify(); // then notify waiting caller @@ -994,15 +1002,23 @@ public class Client { } } + /** + * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable + */ + public Writable call(Writable param, InetSocketAddress address) + throws InterruptedException, IOException { + return call(RpcKind.RPC_WRITABLE, param, address); + + } /** Make a call, passing param, to the IPC server running at * address, returning the value. Throws exceptions if there are * network problems or if the remote code threw an exception. - * @deprecated Use {@link #call(Writable, ConnectionId)} instead + * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead */ @Deprecated - public Writable call(Writable param, InetSocketAddress address) + public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address) throws InterruptedException, IOException { - return call(param, address, null); + return call(rpcKind, param, address, null); } /** Make a call, passing param, to the IPC server running at @@ -1010,15 +1026,15 @@ public class Client { * the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @deprecated Use {@link #call(Writable, ConnectionId)} instead + * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead */ @Deprecated - public Writable call(Writable param, InetSocketAddress addr, + public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, UserGroupInformation ticket) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0, conf); - return call(param, remoteId); + return call(rpcKind, param, remoteId); } /** Make a call, passing param, to the IPC server running at @@ -1027,18 +1043,33 @@ public class Client { * timeout, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @deprecated Use {@link #call(Writable, ConnectionId)} instead + * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead */ @Deprecated - public Writable call(Writable param, InetSocketAddress addr, + public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, conf); - return call(param, remoteId); + return call(rpcKind, param, remoteId); } + + /** + * Same as {@link #call(RpcKind, Writable, InetSocketAddress, + * Class, UserGroupInformation, int, Configuration)} + * except that rpcKind is writable. + */ + public Writable call(Writable param, InetSocketAddress addr, + Class protocol, UserGroupInformation ticket, + int rpcTimeout, Configuration conf) + throws InterruptedException, IOException { + ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, + ticket, rpcTimeout, conf); + return call(RpcKind.RPC_WRITABLE, param, remoteId); + } + /** * Make a call, passing param, to the IPC server running at * address which is servicing the protocol protocol, @@ -1047,22 +1078,31 @@ public class Client { * value. Throws exceptions if there are network problems or if the remote * code threw an exception. */ - public Writable call(Writable param, InetSocketAddress addr, + public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout, Configuration conf) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, conf); - return call(param, remoteId); + return call(rpcKind, param, remoteId); + } + + /** + * Same as {link {@link #call(RpcKind, Writable, ConnectionId)} + * except the rpcKind is RPC_WRITABLE + */ + public Writable call(Writable param, ConnectionId remoteId) + throws InterruptedException, IOException { + return call(RpcKind.RPC_WRITABLE, param, remoteId); } /** Make a call, passing param, to the IPC server defined by * remoteId, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. */ - public Writable call(Writable param, ConnectionId remoteId) + public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId) throws InterruptedException, IOException { - Call call = new Call(param); + Call call = new Call(rpcKind, param); Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter boolean interrupted = false; @@ -1094,7 +1134,7 @@ public class Client { call.error); } } else { - return call.value; + return call.rpcResponse; } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index aec56a9d57a..dad94227ca2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto; @@ -139,7 +140,7 @@ public class ProtobufRpcEngine implements RpcEngine { HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args); RpcResponseWritable val = null; try { - val = (RpcResponseWritable) client.call( + val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER, new RpcRequestWritable(rpcRequest), remoteId); } catch (Exception e) { RpcClientException ce = new RpcClientException("Client exception", e); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java new file mode 100644 index 00000000000..1b62f0caa7f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java @@ -0,0 +1,118 @@ +package org.apache.hadoop.ipc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * This is the rpc payload header. It is sent with every rpc call + *
+ * The format of RPC call is as follows:
+ * +---------------------------------------------------+
+ * |  Rpc length in bytes (header + payload length)    |
+ * +---------------------------------------------------+
+ * |      Rpc Header       |       Rpc Payload         |
+ * +---------------------------------------------------+
+ * 
+ * The format of Rpc Header is:
+ * +----------------------------------+
+ * |  RpcKind (1 bytes)               |      
+ * +----------------------------------+
+ * |  RpcPayloadOperation (1 bytes)   |      
+ * +----------------------------------+
+ * |  Call ID (4 bytes)               |      
+ * +----------------------------------+
+ * 
+ * {@link RpcKind} determines the type of serialization used for Rpc Payload.
+ * 
+ *

+ * Note this header does NOT have its own version number, + * it used the version number from the connection header. + */ +public class RpcPayloadHeader implements Writable { + public enum RpcPayloadOperation { + RPC_FINAL_PAYLOAD ((short)1), + RPC_CONTINUATION_PAYLOAD ((short)2), // not implemented yet + RPC_CLOSE_CONNECTION ((short)3); // close the rpc connection + + private final short code; + private static final short FIRST_INDEX = RPC_FINAL_PAYLOAD.code; + RpcPayloadOperation(short val) { + this.code = val; + } + + public void write(DataOutput out) throws IOException { + out.writeByte(code); + } + + static RpcPayloadOperation readFields(DataInput in) throws IOException { + short inValue = in.readByte(); + return RpcPayloadOperation.values()[inValue - FIRST_INDEX]; + } + } + + public enum RpcKind { + RPC_BUILTIN ((short ) 1), // Used for built in calls + RPC_WRITABLE ((short ) 2), + RPC_PROTOCOL_BUFFER ((short)3), + RPC_AVRO ((short)4); + + private final short value; + private static final short FIRST_INDEX = RPC_BUILTIN.value; + RpcKind(short val) { + this.value = val; + } + + public void write(DataOutput out) throws IOException { + out.writeByte(value); + } + + static RpcKind readFields(DataInput in) throws IOException { + short inValue = in.readByte(); + return RpcKind.values()[inValue - FIRST_INDEX]; + } + } + + private RpcKind kind; + private RpcPayloadOperation operation; + private int callId; + + public RpcPayloadHeader() { + kind = RpcKind.RPC_WRITABLE; + operation = RpcPayloadOperation.RPC_CLOSE_CONNECTION; + } + + public RpcPayloadHeader(RpcKind kind, RpcPayloadOperation op, int callId) { + this.kind = kind; + this.operation = op; + this.callId = callId; + } + + int getCallId() { + return callId; + } + + RpcKind getkind() { + return kind; + } + + RpcPayloadOperation getOperation() { + return operation; + } + + @Override + public void write(DataOutput out) throws IOException { + kind.write(out); + operation.write(out); + out.writeInt(callId); + } + + @Override + public void readFields(DataInput in) throws IOException { + kind = RpcKind.readFields(in); + operation = RpcPayloadOperation.readFields(in); + this.callId = in.readInt(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 34819f79445..2b27fdf87a7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -62,11 +62,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RpcPayloadHeader.*; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RPC.VersionMismatch; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation; import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.net.NetUtils; @@ -108,7 +110,8 @@ public abstract class Server { // 4 : Introduced SASL security layer // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal} // in ObjectWritable to efficiently transmit arrays of primitives - public static final byte CURRENT_VERSION = 5; + // 6 : Made RPC payload header explicit + public static final byte CURRENT_VERSION = 6; /** * Initial and max size of response buffer @@ -261,28 +264,33 @@ public abstract class Server { /** A call queued for handling. */ private static class Call { - private int id; // the client's call id - private Writable param; // the parameter passed - private Connection connection; // connection to client - private long timestamp; // the time received when response is null - // the time served when response is not null - private ByteBuffer response; // the response for this call + private final int callId; // the client's call id + private final Writable rpcRequest; // Serialized Rpc request from client + private final Connection connection; // connection to client + private long timestamp; // time received when response is null + // time served when response is not null + private ByteBuffer rpcResponse; // the response for this call + private final RpcKind rpcKind; - public Call(int id, Writable param, Connection connection) { - this.id = id; - this.param = param; + public Call(int id, Writable param, Connection connection) { + this( id, param, connection, RpcKind.RPC_BUILTIN ); + } + public Call(int id, Writable param, Connection connection, RpcKind kind) { + this.callId = id; + this.rpcRequest = param; this.connection = connection; this.timestamp = System.currentTimeMillis(); - this.response = null; + this.rpcResponse = null; + this.rpcKind = kind; } @Override public String toString() { - return param.toString() + " from " + connection.toString(); + return rpcRequest.toString() + " from " + connection.toString(); } public void setResponse(ByteBuffer response) { - this.response = response; + this.rpcResponse = response; } } @@ -781,17 +789,17 @@ public abstract class Server { call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": responding to #" + call.id + " from " + + LOG.debug(getName() + ": responding to #" + call.callId + " from " + call.connection); } // // Send as much data as we can in the non-blocking fashion // - int numBytes = channelWrite(channel, call.response); + int numBytes = channelWrite(channel, call.rpcResponse); if (numBytes < 0) { return true; } - if (!call.response.hasRemaining()) { + if (!call.rpcResponse.hasRemaining()) { call.connection.decRpcCount(); if (numElements == 1) { // last call fully processes. done = true; // no more data for this channel. @@ -799,7 +807,7 @@ public abstract class Server { done = false; // more calls pending to be sent. } if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": responding to #" + call.id + " from " + + LOG.debug(getName() + ": responding to #" + call.callId + " from " + call.connection + " Wrote " + numBytes + " bytes."); } } else { @@ -827,7 +835,7 @@ public abstract class Server { } } if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": responding to #" + call.id + " from " + + LOG.debug(getName() + ": responding to #" + call.callId + " from " + call.connection + " Wrote partial " + numBytes + " bytes."); } @@ -1377,18 +1385,24 @@ public abstract class Server { private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); - int id = dis.readInt(); // try to read an id + RpcPayloadHeader header = new RpcPayloadHeader(); + header.readFields(dis); // Read the RpcPayload header if (LOG.isDebugEnabled()) - LOG.debug(" got #" + id); - Writable param; - try { - param = ReflectionUtils.newInstance(paramClass, conf);//read param - param.readFields(dis); + LOG.debug(" got #" + header.getCallId()); + if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) { + throw new IOException("IPC Server does not implement operation" + + header.getOperation()); + } + Writable rpcRequest; + try { //Read the rpc request + rpcRequest = ReflectionUtils.newInstance(paramClass, conf); + rpcRequest.readFields(dis); } catch (Throwable t) { LOG.warn("Unable to read call parameters for client " + getHostAddress(), t); - final Call readParamsFailedCall = new Call(id, null, this); + final Call readParamsFailedCall = + new Call(header.getCallId(), null, this); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, @@ -1398,7 +1412,7 @@ public abstract class Server { return; } - Call call = new Call(id, param, this); + Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind()); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } @@ -1462,8 +1476,8 @@ public abstract class Server { final Call call = callQueue.take(); // pop the queue; maybe blocked here if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": has #" + call.id + " from " + - call.connection); + LOG.debug(getName() + ": has Call#" + call.callId + + "for RpcKind " + call.rpcKind + " from " + call.connection); String errorClass = null; String error = null; @@ -1474,7 +1488,7 @@ public abstract class Server { // Make the call as the user via Subject.doAs, thus associating // the call with the Subject if (call.connection.user == null) { - value = call(call.connection.protocolName, call.param, + value = call(call.connection.protocolName, call.rpcRequest, call.timestamp); } else { value = @@ -1484,7 +1498,7 @@ public abstract class Server { public Writable run() throws Exception { // make the call return call(call.connection.protocolName, - call.param, call.timestamp); + call.rpcRequest, call.timestamp); } } @@ -1634,7 +1648,7 @@ public abstract class Server { throws IOException { response.reset(); DataOutputStream out = new DataOutputStream(response); - out.writeInt(call.id); // write call id + out.writeInt(call.callId); // write call id out.writeInt(status.state); // write status if (status == Status.SUCCESS) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 314bfac5824..48842237d4c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -39,6 +39,7 @@ import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -242,7 +243,7 @@ public class WritableRpcEngine implements RpcEngine { } ObjectWritable value = (ObjectWritable) - client.call(new Invocation(method, args), remoteId); + client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime);