svn merge -c 1197885 from trunk for HADOOP-7776.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1229909 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-11 07:04:06 +00:00
parent 3cf4430876
commit 779d604bfb
6 changed files with 241 additions and 65 deletions

View File

@ -21,6 +21,8 @@ Release 0.23-PB - Unreleased
HADOOP-7716 RPC protocol registration on SS does not log the protocol name
(only the class which may be different) (sanjay)
HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)
Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RpcPayloadHeader.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@ -153,16 +154,20 @@ public class Client {
return refCount==0;
}
/** A call waiting for a value. */
/**
* Class that represents an RPC call
*/
private class Call {
int id; // call id
Writable param; // parameter
Writable value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
final int id; // call id
final Writable rpcRequest; // the serialized rpc request - RpcPayload
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
final RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
protected Call(Writable param) {
this.param = param;
protected Call(RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
this.rpcRequest = param;
synchronized (Client.this) {
this.id = counter++;
}
@ -188,15 +193,15 @@ public class Client {
/** Set the return value when there is no error.
* Notify the caller the call is done.
*
* @param value return value of the call.
* @param rpcResponse return value of the rpc call.
*/
public synchronized void setValue(Writable value) {
this.value = value;
public synchronized void setRpcResponse(Writable rpcResponse) {
this.rpcResponse = rpcResponse;
callComplete();
}
public synchronized Writable getValue() {
return value;
public synchronized Writable getRpcResult() {
return rpcResponse;
}
}
@ -728,6 +733,7 @@ public class Client {
}
}
@SuppressWarnings("unused")
public InetSocketAddress getRemoteAddress() {
return server;
}
@ -788,8 +794,10 @@ public class Client {
//for serializing the
//data to be written
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
RpcPayloadHeader header = new RpcPayloadHeader(
call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id);
header.write(d);
call.rpcRequest.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length
@ -826,7 +834,7 @@ public class Client {
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
call.setValue(value);
call.setRpcResponse(value);
calls.remove(id);
} else if (state == Status.ERROR.state) {
call.setException(new RemoteException(WritableUtils.readString(in),
@ -910,7 +918,7 @@ public class Client {
private int index;
public ParallelCall(Writable param, ParallelResults results, int index) {
super(param);
super(RpcKind.RPC_WRITABLE, param);
this.results = results;
this.index = index;
}
@ -934,7 +942,7 @@ public class Client {
/** Collect a result. */
public synchronized void callComplete(ParallelCall call) {
values[call.index] = call.getValue(); // store the value
values[call.index] = call.getRpcResult(); // store the value
count++; // count it
if (count == size) // if all values are in
notify(); // then notify waiting caller
@ -994,15 +1002,23 @@ public class Client {
}
}
/**
* Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
*/
public Writable call(Writable param, InetSocketAddress address)
throws InterruptedException, IOException {
return call(RpcKind.RPC_WRITABLE, param, address);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception.
* @deprecated Use {@link #call(Writable, ConnectionId)} instead
* @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
*/
@Deprecated
public Writable call(Writable param, InetSocketAddress address)
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
throws InterruptedException, IOException {
return call(param, address, null);
return call(rpcKind, param, address, null);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
@ -1010,15 +1026,15 @@ public class Client {
* the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @deprecated Use {@link #call(Writable, ConnectionId)} instead
* @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
*/
@Deprecated
public Writable call(Writable param, InetSocketAddress addr,
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
conf);
return call(param, remoteId);
return call(rpcKind, param, remoteId);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
@ -1027,18 +1043,33 @@ public class Client {
* timeout, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @deprecated Use {@link #call(Writable, ConnectionId)} instead
* @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
*/
@Deprecated
public Writable call(Writable param, InetSocketAddress addr,
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(param, remoteId);
return call(rpcKind, param, remoteId);
}
/**
* Same as {@link #call(RpcKind, Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that rpcKind is writable.
*/
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, Configuration conf)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RpcKind.RPC_WRITABLE, param, remoteId);
}
/**
* Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
@ -1047,22 +1078,31 @@ public class Client {
* value. Throws exceptions if there are network problems or if the remote
* code threw an exception.
*/
public Writable call(Writable param, InetSocketAddress addr,
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, Configuration conf)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(param, remoteId);
return call(rpcKind, param, remoteId);
}
/**
* Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
* except the rpcKind is RPC_WRITABLE
*/
public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
return call(RpcKind.RPC_WRITABLE, param, remoteId);
}
/** Make a call, passing <code>param</code>, to the IPC server defined by
* <code>remoteId</code>, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
public Writable call(Writable param, ConnectionId remoteId)
public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
Call call = new Call(param);
Call call = new Call(rpcKind, param);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
@ -1094,7 +1134,7 @@ public class Client {
call.error);
}
} else {
return call.value;
return call.rpcResponse;
}
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
@ -139,7 +140,7 @@ public class ProtobufRpcEngine implements RpcEngine {
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
RpcResponseWritable val = null;
try {
val = (RpcResponseWritable) client.call(
val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWritable(rpcRequest), remoteId);
} catch (Exception e) {
RpcClientException ce = new RpcClientException("Client exception", e);

View File

@ -0,0 +1,118 @@
package org.apache.hadoop.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* This is the rpc payload header. It is sent with every rpc call
* <pre>
* The format of RPC call is as follows:
* +---------------------------------------------------+
* | Rpc length in bytes (header + payload length) |
* +---------------------------------------------------+
* | Rpc Header | Rpc Payload |
* +---------------------------------------------------+
*
* The format of Rpc Header is:
* +----------------------------------+
* | RpcKind (1 bytes) |
* +----------------------------------+
* | RpcPayloadOperation (1 bytes) |
* +----------------------------------+
* | Call ID (4 bytes) |
* +----------------------------------+
*
* {@link RpcKind} determines the type of serialization used for Rpc Payload.
* </pre>
* <p>
* <b>Note this header does NOT have its own version number,
* it used the version number from the connection header. </b>
*/
public class RpcPayloadHeader implements Writable {
public enum RpcPayloadOperation {
RPC_FINAL_PAYLOAD ((short)1),
RPC_CONTINUATION_PAYLOAD ((short)2), // not implemented yet
RPC_CLOSE_CONNECTION ((short)3); // close the rpc connection
private final short code;
private static final short FIRST_INDEX = RPC_FINAL_PAYLOAD.code;
RpcPayloadOperation(short val) {
this.code = val;
}
public void write(DataOutput out) throws IOException {
out.writeByte(code);
}
static RpcPayloadOperation readFields(DataInput in) throws IOException {
short inValue = in.readByte();
return RpcPayloadOperation.values()[inValue - FIRST_INDEX];
}
}
public enum RpcKind {
RPC_BUILTIN ((short ) 1), // Used for built in calls
RPC_WRITABLE ((short ) 2),
RPC_PROTOCOL_BUFFER ((short)3),
RPC_AVRO ((short)4);
private final short value;
private static final short FIRST_INDEX = RPC_BUILTIN.value;
RpcKind(short val) {
this.value = val;
}
public void write(DataOutput out) throws IOException {
out.writeByte(value);
}
static RpcKind readFields(DataInput in) throws IOException {
short inValue = in.readByte();
return RpcKind.values()[inValue - FIRST_INDEX];
}
}
private RpcKind kind;
private RpcPayloadOperation operation;
private int callId;
public RpcPayloadHeader() {
kind = RpcKind.RPC_WRITABLE;
operation = RpcPayloadOperation.RPC_CLOSE_CONNECTION;
}
public RpcPayloadHeader(RpcKind kind, RpcPayloadOperation op, int callId) {
this.kind = kind;
this.operation = op;
this.callId = callId;
}
int getCallId() {
return callId;
}
RpcKind getkind() {
return kind;
}
RpcPayloadOperation getOperation() {
return operation;
}
@Override
public void write(DataOutput out) throws IOException {
kind.write(out);
operation.write(out);
out.writeInt(callId);
}
@Override
public void readFields(DataInput in) throws IOException {
kind = RpcKind.readFields(in);
operation = RpcPayloadOperation.readFields(in);
this.callId = in.readInt();
}
}

View File

@ -62,11 +62,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RpcPayloadHeader.*;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.net.NetUtils;
@ -108,7 +110,8 @@ public abstract class Server {
// 4 : Introduced SASL security layer
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
// in ObjectWritable to efficiently transmit arrays of primitives
public static final byte CURRENT_VERSION = 5;
// 6 : Made RPC payload header explicit
public static final byte CURRENT_VERSION = 6;
/**
* Initial and max size of response buffer
@ -261,28 +264,33 @@ public abstract class Server {
/** A call queued for handling. */
private static class Call {
private int id; // the client's call id
private Writable param; // the parameter passed
private Connection connection; // connection to client
private long timestamp; // the time received when response is null
// the time served when response is not null
private ByteBuffer response; // the response for this call
private final int callId; // the client's call id
private final Writable rpcRequest; // Serialized Rpc request from client
private final Connection connection; // connection to client
private long timestamp; // time received when response is null
// time served when response is not null
private ByteBuffer rpcResponse; // the response for this call
private final RpcKind rpcKind;
public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
public Call(int id, Writable param, Connection connection) {
this( id, param, connection, RpcKind.RPC_BUILTIN );
}
public Call(int id, Writable param, Connection connection, RpcKind kind) {
this.callId = id;
this.rpcRequest = param;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
this.rpcResponse = null;
this.rpcKind = kind;
}
@Override
public String toString() {
return param.toString() + " from " + connection.toString();
return rpcRequest.toString() + " from " + connection.toString();
}
public void setResponse(ByteBuffer response) {
this.response = response;
this.rpcResponse = response;
}
}
@ -781,17 +789,17 @@ public abstract class Server {
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
LOG.debug(getName() + ": responding to #" + call.callId + " from " +
call.connection);
}
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = channelWrite(channel, call.response);
int numBytes = channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
if (!call.response.hasRemaining()) {
if (!call.rpcResponse.hasRemaining()) {
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
@ -799,7 +807,7 @@ public abstract class Server {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
LOG.debug(getName() + ": responding to #" + call.callId + " from " +
call.connection + " Wrote " + numBytes + " bytes.");
}
} else {
@ -827,7 +835,7 @@ public abstract class Server {
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
LOG.debug(getName() + ": responding to #" + call.callId + " from " +
call.connection + " Wrote partial " + numBytes +
" bytes.");
}
@ -1377,18 +1385,24 @@ public abstract class Server {
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id
RpcPayloadHeader header = new RpcPayloadHeader();
header.readFields(dis); // Read the RpcPayload header
if (LOG.isDebugEnabled())
LOG.debug(" got #" + id);
Writable param;
try {
param = ReflectionUtils.newInstance(paramClass, conf);//read param
param.readFields(dis);
LOG.debug(" got #" + header.getCallId());
if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) {
throw new IOException("IPC Server does not implement operation" +
header.getOperation());
}
Writable rpcRequest;
try { //Read the rpc request
rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " +
getHostAddress(), t);
final Call readParamsFailedCall = new Call(id, null, this);
final Call readParamsFailedCall =
new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
@ -1398,7 +1412,7 @@ public abstract class Server {
return;
}
Call call = new Call(id, param, this);
Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind());
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
@ -1462,8 +1476,8 @@ public abstract class Server {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " +
call.connection);
LOG.debug(getName() + ": has Call#" + call.callId +
"for RpcKind " + call.rpcKind + " from " + call.connection);
String errorClass = null;
String error = null;
@ -1474,7 +1488,7 @@ public abstract class Server {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if (call.connection.user == null) {
value = call(call.connection.protocolName, call.param,
value = call(call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
@ -1484,7 +1498,7 @@ public abstract class Server {
public Writable run() throws Exception {
// make the call
return call(call.connection.protocolName,
call.param, call.timestamp);
call.rpcRequest, call.timestamp);
}
}
@ -1634,7 +1648,7 @@ public abstract class Server {
throws IOException {
response.reset();
DataOutputStream out = new DataOutputStream(response);
out.writeInt(call.id); // write call id
out.writeInt(call.callId); // write call id
out.writeInt(status.state); // write status
if (status == Status.SUCCESS) {

View File

@ -39,6 +39,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@ -242,7 +243,7 @@ public class WritableRpcEngine implements RpcEngine {
}
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);