diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ec11d208c73..f559cbb9cc9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -55,6 +55,8 @@ Trunk (unreleased changes) HADOOP-7792. Add verifyToken method to AbstractDelegationTokenSecretManager. (jitendra) + HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay) + BUGS HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 58cf8101864..55e8a23d483 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -48,6 +48,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RpcPayloadHeader.*; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -152,16 +153,20 @@ synchronized boolean isZeroReference() { return refCount==0; } - /** A call waiting for a value. */ + /** + * Class that represents an RPC call + */ private class Call { - int id; // call id - Writable param; // parameter - Writable value; // value, null if error - IOException error; // exception, null if value - boolean done; // true when call is done + final int id; // call id + final Writable rpcRequest; // the serialized rpc request - RpcPayload + Writable rpcResponse; // null if rpc has error + IOException error; // exception, null if success + final RpcKind rpcKind; // Rpc EngineKind + boolean done; // true when call is done - protected Call(Writable param) { - this.param = param; + protected Call(RpcKind rpcKind, Writable param) { + this.rpcKind = rpcKind; + this.rpcRequest = param; synchronized (Client.this) { this.id = counter++; } @@ -187,15 +192,15 @@ public synchronized void setException(IOException error) { /** Set the return value when there is no error. * Notify the caller the call is done. * - * @param value return value of the call. + * @param rpcResponse return value of the rpc call. */ - public synchronized void setValue(Writable value) { - this.value = value; + public synchronized void setRpcResponse(Writable rpcResponse) { + this.rpcResponse = rpcResponse; callComplete(); } - public synchronized Writable getValue() { - return value; + public synchronized Writable getRpcResult() { + return rpcResponse; } } @@ -727,6 +732,7 @@ private synchronized boolean waitForWork() { } } + @SuppressWarnings("unused") public InetSocketAddress getRemoteAddress() { return server; } @@ -787,8 +793,10 @@ public void sendParam(Call call) { //for serializing the //data to be written d = new DataOutputBuffer(); - d.writeInt(call.id); - call.param.write(d); + RpcPayloadHeader header = new RpcPayloadHeader( + call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id); + header.write(d); + call.rpcRequest.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //first put the data length @@ -825,7 +833,7 @@ private void receiveResponse() { if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value - call.setValue(value); + call.setRpcResponse(value); calls.remove(id); } else if (state == Status.ERROR.state) { call.setException(new RemoteException(WritableUtils.readString(in), @@ -909,7 +917,7 @@ private class ParallelCall extends Call { private int index; public ParallelCall(Writable param, ParallelResults results, int index) { - super(param); + super(RpcKind.RPC_WRITABLE, param); this.results = results; this.index = index; } @@ -933,7 +941,7 @@ public ParallelResults(int size) { /** Collect a result. */ public synchronized void callComplete(ParallelCall call) { - values[call.index] = call.getValue(); // store the value + values[call.index] = call.getRpcResult(); // store the value count++; // count it if (count == size) // if all values are in notify(); // then notify waiting caller @@ -993,15 +1001,23 @@ public void stop() { } } + /** + * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable + */ + public Writable call(Writable param, InetSocketAddress address) + throws InterruptedException, IOException { + return call(RpcKind.RPC_WRITABLE, param, address); + + } /** Make a call, passing param, to the IPC server running at * address, returning the value. Throws exceptions if there are * network problems or if the remote code threw an exception. - * @deprecated Use {@link #call(Writable, ConnectionId)} instead + * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead */ @Deprecated - public Writable call(Writable param, InetSocketAddress address) + public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address) throws InterruptedException, IOException { - return call(param, address, null); + return call(rpcKind, param, address, null); } /** Make a call, passing param, to the IPC server running at @@ -1009,15 +1025,15 @@ public Writable call(Writable param, InetSocketAddress address) * the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @deprecated Use {@link #call(Writable, ConnectionId)} instead + * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead */ @Deprecated - public Writable call(Writable param, InetSocketAddress addr, + public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, UserGroupInformation ticket) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0, conf); - return call(param, remoteId); + return call(rpcKind, param, remoteId); } /** Make a call, passing param, to the IPC server running at @@ -1026,18 +1042,33 @@ public Writable call(Writable param, InetSocketAddress addr, * timeout, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @deprecated Use {@link #call(Writable, ConnectionId)} instead + * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead */ @Deprecated - public Writable call(Writable param, InetSocketAddress addr, + public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, conf); - return call(param, remoteId); + return call(rpcKind, param, remoteId); } + + /** + * Same as {@link #call(RpcKind, Writable, InetSocketAddress, + * Class, UserGroupInformation, int, Configuration)} + * except that rpcKind is writable. + */ + public Writable call(Writable param, InetSocketAddress addr, + Class protocol, UserGroupInformation ticket, + int rpcTimeout, Configuration conf) + throws InterruptedException, IOException { + ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, + ticket, rpcTimeout, conf); + return call(RpcKind.RPC_WRITABLE, param, remoteId); + } + /** * Make a call, passing param, to the IPC server running at * address which is servicing the protocol protocol, @@ -1046,22 +1077,31 @@ public Writable call(Writable param, InetSocketAddress addr, * value. Throws exceptions if there are network problems or if the remote * code threw an exception. */ - public Writable call(Writable param, InetSocketAddress addr, + public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout, Configuration conf) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, conf); - return call(param, remoteId); + return call(rpcKind, param, remoteId); + } + + /** + * Same as {link {@link #call(RpcKind, Writable, ConnectionId)} + * except the rpcKind is RPC_WRITABLE + */ + public Writable call(Writable param, ConnectionId remoteId) + throws InterruptedException, IOException { + return call(RpcKind.RPC_WRITABLE, param, remoteId); } /** Make a call, passing param, to the IPC server defined by * remoteId, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. */ - public Writable call(Writable param, ConnectionId remoteId) + public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId) throws InterruptedException, IOException { - Call call = new Call(param); + Call call = new Call(rpcKind, param); Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter boolean interrupted = false; @@ -1093,7 +1133,7 @@ public Writable call(Writable param, ConnectionId remoteId) call.error); } } else { - return call.value; + return call.rpcResponse; } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index aec56a9d57a..dad94227ca2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto; @@ -139,7 +140,7 @@ public Object invoke(Object proxy, Method method, Object[] args) HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args); RpcResponseWritable val = null; try { - val = (RpcResponseWritable) client.call( + val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER, new RpcRequestWritable(rpcRequest), remoteId); } catch (Exception e) { RpcClientException ce = new RpcClientException("Client exception", e); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java new file mode 100644 index 00000000000..1b62f0caa7f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java @@ -0,0 +1,118 @@ +package org.apache.hadoop.ipc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * This is the rpc payload header. It is sent with every rpc call + *
+ * The format of RPC call is as follows:
+ * +---------------------------------------------------+
+ * |  Rpc length in bytes (header + payload length)    |
+ * +---------------------------------------------------+
+ * |      Rpc Header       |       Rpc Payload         |
+ * +---------------------------------------------------+
+ * 
+ * The format of Rpc Header is:
+ * +----------------------------------+
+ * |  RpcKind (1 bytes)               |      
+ * +----------------------------------+
+ * |  RpcPayloadOperation (1 bytes)   |      
+ * +----------------------------------+
+ * |  Call ID (4 bytes)               |      
+ * +----------------------------------+
+ * 
+ * {@link RpcKind} determines the type of serialization used for Rpc Payload.
+ * 
+ *

+ * Note this header does NOT have its own version number, + * it used the version number from the connection header. + */ +public class RpcPayloadHeader implements Writable { + public enum RpcPayloadOperation { + RPC_FINAL_PAYLOAD ((short)1), + RPC_CONTINUATION_PAYLOAD ((short)2), // not implemented yet + RPC_CLOSE_CONNECTION ((short)3); // close the rpc connection + + private final short code; + private static final short FIRST_INDEX = RPC_FINAL_PAYLOAD.code; + RpcPayloadOperation(short val) { + this.code = val; + } + + public void write(DataOutput out) throws IOException { + out.writeByte(code); + } + + static RpcPayloadOperation readFields(DataInput in) throws IOException { + short inValue = in.readByte(); + return RpcPayloadOperation.values()[inValue - FIRST_INDEX]; + } + } + + public enum RpcKind { + RPC_BUILTIN ((short ) 1), // Used for built in calls + RPC_WRITABLE ((short ) 2), + RPC_PROTOCOL_BUFFER ((short)3), + RPC_AVRO ((short)4); + + private final short value; + private static final short FIRST_INDEX = RPC_BUILTIN.value; + RpcKind(short val) { + this.value = val; + } + + public void write(DataOutput out) throws IOException { + out.writeByte(value); + } + + static RpcKind readFields(DataInput in) throws IOException { + short inValue = in.readByte(); + return RpcKind.values()[inValue - FIRST_INDEX]; + } + } + + private RpcKind kind; + private RpcPayloadOperation operation; + private int callId; + + public RpcPayloadHeader() { + kind = RpcKind.RPC_WRITABLE; + operation = RpcPayloadOperation.RPC_CLOSE_CONNECTION; + } + + public RpcPayloadHeader(RpcKind kind, RpcPayloadOperation op, int callId) { + this.kind = kind; + this.operation = op; + this.callId = callId; + } + + int getCallId() { + return callId; + } + + RpcKind getkind() { + return kind; + } + + RpcPayloadOperation getOperation() { + return operation; + } + + @Override + public void write(DataOutput out) throws IOException { + kind.write(out); + operation.write(out); + out.writeInt(callId); + } + + @Override + public void readFields(DataInput in) throws IOException { + kind = RpcKind.readFields(in); + operation = RpcPayloadOperation.readFields(in); + this.callId = in.readInt(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 78c0bc76a10..c04833f232d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -61,11 +61,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ipc.RpcPayloadHeader.*; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RPC.VersionMismatch; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation; import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.net.NetUtils; @@ -124,7 +126,8 @@ public abstract class Server { // 4 : Introduced SASL security layer // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal} // in ObjectWritable to efficiently transmit arrays of primitives - public static final byte CURRENT_VERSION = 5; + // 6 : Made RPC payload header explicit + public static final byte CURRENT_VERSION = 6; /** * Initial and max size of response buffer @@ -277,28 +280,33 @@ public ServiceAuthorizationManager getServiceAuthorizationManager() { /** A call queued for handling. */ private static class Call { - private int id; // the client's call id - private Writable param; // the parameter passed - private Connection connection; // connection to client - private long timestamp; // the time received when response is null - // the time served when response is not null - private ByteBuffer response; // the response for this call + private final int callId; // the client's call id + private final Writable rpcRequest; // Serialized Rpc request from client + private final Connection connection; // connection to client + private long timestamp; // time received when response is null + // time served when response is not null + private ByteBuffer rpcResponse; // the response for this call + private final RpcKind rpcKind; - public Call(int id, Writable param, Connection connection) { - this.id = id; - this.param = param; + public Call(int id, Writable param, Connection connection) { + this( id, param, connection, RpcKind.RPC_BUILTIN ); + } + public Call(int id, Writable param, Connection connection, RpcKind kind) { + this.callId = id; + this.rpcRequest = param; this.connection = connection; this.timestamp = System.currentTimeMillis(); - this.response = null; + this.rpcResponse = null; + this.rpcKind = kind; } @Override public String toString() { - return param.toString() + " from " + connection.toString(); + return rpcRequest.toString() + " from " + connection.toString(); } public void setResponse(ByteBuffer response) { - this.response = response; + this.rpcResponse = response; } } @@ -795,17 +803,17 @@ private boolean processResponse(LinkedList responseQueue, call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": responding to #" + call.id + " from " + + LOG.debug(getName() + ": responding to #" + call.callId + " from " + call.connection); } // // Send as much data as we can in the non-blocking fashion // - int numBytes = channelWrite(channel, call.response); + int numBytes = channelWrite(channel, call.rpcResponse); if (numBytes < 0) { return true; } - if (!call.response.hasRemaining()) { + if (!call.rpcResponse.hasRemaining()) { call.connection.decRpcCount(); if (numElements == 1) { // last call fully processes. done = true; // no more data for this channel. @@ -813,7 +821,7 @@ private boolean processResponse(LinkedList responseQueue, done = false; // more calls pending to be sent. } if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": responding to #" + call.id + " from " + + LOG.debug(getName() + ": responding to #" + call.callId + " from " + call.connection + " Wrote " + numBytes + " bytes."); } } else { @@ -841,7 +849,7 @@ private boolean processResponse(LinkedList responseQueue, } } if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": responding to #" + call.id + " from " + + LOG.debug(getName() + ": responding to #" + call.callId + " from " + call.connection + " Wrote partial " + numBytes + " bytes."); } @@ -1408,18 +1416,24 @@ private void processOneRpc(byte[] buf) throws IOException, private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); - int id = dis.readInt(); // try to read an id + RpcPayloadHeader header = new RpcPayloadHeader(); + header.readFields(dis); // Read the RpcPayload header if (LOG.isDebugEnabled()) - LOG.debug(" got #" + id); - Writable param; - try { - param = ReflectionUtils.newInstance(paramClass, conf);//read param - param.readFields(dis); + LOG.debug(" got #" + header.getCallId()); + if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) { + throw new IOException("IPC Server does not implement operation" + + header.getOperation()); + } + Writable rpcRequest; + try { //Read the rpc request + rpcRequest = ReflectionUtils.newInstance(paramClass, conf); + rpcRequest.readFields(dis); } catch (Throwable t) { LOG.warn("Unable to read call parameters for client " + getHostAddress(), t); - final Call readParamsFailedCall = new Call(id, null, this); + final Call readParamsFailedCall = + new Call(header.getCallId(), null, this); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, @@ -1429,7 +1443,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException { return; } - Call call = new Call(id, param, this); + Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind()); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } @@ -1493,8 +1507,8 @@ public void run() { final Call call = callQueue.take(); // pop the queue; maybe blocked here if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": has #" + call.id + " from " + - call.connection); + LOG.debug(getName() + ": has Call#" + call.callId + + "for RpcKind " + call.rpcKind + " from " + call.connection); String errorClass = null; String error = null; @@ -1505,7 +1519,7 @@ public void run() { // Make the call as the user via Subject.doAs, thus associating // the call with the Subject if (call.connection.user == null) { - value = call(call.connection.protocolName, call.param, + value = call(call.connection.protocolName, call.rpcRequest, call.timestamp); } else { value = @@ -1515,7 +1529,7 @@ public void run() { public Writable run() throws Exception { // make the call return call(call.connection.protocolName, - call.param, call.timestamp); + call.rpcRequest, call.timestamp); } } @@ -1657,7 +1671,7 @@ private void setupResponse(ByteArrayOutputStream response, throws IOException { response.reset(); DataOutputStream out = new DataOutputStream(response); - out.writeInt(call.id); // write call id + out.writeInt(call.callId); // write call id out.writeInt(status.state); // write status if (status == Status.SUCCESS) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 314bfac5824..48842237d4c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -242,7 +243,7 @@ public Object invoke(Object proxy, Method method, Object[] args) } ObjectWritable value = (ObjectWritable) - client.call(new Invocation(method, args), remoteId); + client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime);