HADOOP-9630. [RPC v9] Remove IpcSerializationType. (Junping Du via llu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1491683 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2a05c63004
commit
ff2dcbc408
|
@ -30,17 +30,19 @@ Release 2.1.0-beta - UNRELEASED
|
||||||
|
|
||||||
HADOOP-8886. Remove KFS support. (eli)
|
HADOOP-8886. Remove KFS support. (eli)
|
||||||
|
|
||||||
HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to
|
HADOOP-9163. [RPC v9] The rpc msg in ProtobufRpcEngine.proto should be moved out to
|
||||||
avoid an extra copy (Sanjay Radia)
|
avoid an extra copy (Sanjay Radia)
|
||||||
|
|
||||||
HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending
|
HADOOP-9151. [RPC v9] Include RPC error info in RpcResponseHeader instead of sending
|
||||||
it separately (sanjay Radia)
|
it separately (sanjay Radia)
|
||||||
|
|
||||||
HADOOP-9380 Add totalLength to rpc response (sanjay Radia)
|
HADOOP-9380. [RPC v9] Add totalLength to rpc response (sanjay Radia)
|
||||||
|
|
||||||
HADOOP-9425 Add error codes to rpc-response (sanjay Radia)
|
HADOOP-9425. [RPC v9] Add error codes to rpc-response (sanjay Radia)
|
||||||
|
|
||||||
HADOOP-9194. RPC support for QoS. (Junping Du via llu)
|
HADOOP-9194. [RPC v9] RPC support for QoS. (Junping Du via llu)
|
||||||
|
|
||||||
|
HADOOP-9630. [RPC v9] Remove IpcSerializationType. (Junping Du via llu)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
|
|
@ -748,8 +748,6 @@ public class Client {
|
||||||
* +----------------------------------+
|
* +----------------------------------+
|
||||||
* | Authmethod (1 byte) |
|
* | Authmethod (1 byte) |
|
||||||
* +----------------------------------+
|
* +----------------------------------+
|
||||||
* | IpcSerializationType (1 byte) |
|
|
||||||
* +----------------------------------+
|
|
||||||
*/
|
*/
|
||||||
private void writeConnectionHeader(OutputStream outStream)
|
private void writeConnectionHeader(OutputStream outStream)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -759,7 +757,6 @@ public class Client {
|
||||||
out.write(Server.CURRENT_VERSION);
|
out.write(Server.CURRENT_VERSION);
|
||||||
out.write(serviceClass);
|
out.write(serviceClass);
|
||||||
authMethod.write(out);
|
authMethod.write(out);
|
||||||
Server.IpcSerializationType.PROTOBUF.write(out);
|
|
||||||
out.flush();
|
out.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -161,22 +161,6 @@ public abstract class Server {
|
||||||
*/
|
*/
|
||||||
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
||||||
|
|
||||||
/**
|
|
||||||
* Serialization type for ConnectionContext and RpcRequestHeader
|
|
||||||
*/
|
|
||||||
public enum IpcSerializationType {
|
|
||||||
// Add new serialization type to the end without affecting the enum order
|
|
||||||
PROTOBUF;
|
|
||||||
|
|
||||||
void write(DataOutput out) throws IOException {
|
|
||||||
out.writeByte(this.ordinal());
|
|
||||||
}
|
|
||||||
|
|
||||||
static IpcSerializationType fromByte(byte b) {
|
|
||||||
return IpcSerializationType.values()[b];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the user accidentally sends an HTTP GET to an IPC port, we detect this
|
* If the user accidentally sends an HTTP GET to an IPC port, we detect this
|
||||||
* and send back a nicer response.
|
* and send back a nicer response.
|
||||||
|
@ -1319,7 +1303,7 @@ public abstract class Server {
|
||||||
if (!connectionHeaderRead) {
|
if (!connectionHeaderRead) {
|
||||||
//Every connection is expected to send the header.
|
//Every connection is expected to send the header.
|
||||||
if (connectionHeaderBuf == null) {
|
if (connectionHeaderBuf == null) {
|
||||||
connectionHeaderBuf = ByteBuffer.allocate(4);
|
connectionHeaderBuf = ByteBuffer.allocate(3);
|
||||||
}
|
}
|
||||||
count = channelRead(channel, connectionHeaderBuf);
|
count = channelRead(channel, connectionHeaderBuf);
|
||||||
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
|
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
|
||||||
|
@ -1352,13 +1336,6 @@ public abstract class Server {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
IpcSerializationType serializationType = IpcSerializationType
|
|
||||||
.fromByte(connectionHeaderBuf.get(3));
|
|
||||||
if (serializationType != IpcSerializationType.PROTOBUF) {
|
|
||||||
respondUnsupportedSerialization(serializationType);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dataLengthBuffer.clear();
|
dataLengthBuffer.clear();
|
||||||
if (authMethod == null) {
|
if (authMethod == null) {
|
||||||
throw new IOException("Unable to read authentication method");
|
throw new IOException("Unable to read authentication method");
|
||||||
|
@ -1551,18 +1528,6 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void respondUnsupportedSerialization(IpcSerializationType st) throws IOException {
|
|
||||||
String errMsg = "Server IPC version " + CURRENT_VERSION
|
|
||||||
+ " do not support serilization " + st.toString();
|
|
||||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
|
||||||
|
|
||||||
Call fakeCall = new Call(-1, null, this);
|
|
||||||
setupResponse(buffer, fakeCall,
|
|
||||||
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNSUPPORTED_SERIALIZATION,
|
|
||||||
null, IpcException.class.getName(), errMsg);
|
|
||||||
responder.doRespond(fakeCall);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setupHttpRequestOnIpcPortResponse() throws IOException {
|
private void setupHttpRequestOnIpcPortResponse() throws IOException {
|
||||||
Call fakeCall = new Call(0, null, this);
|
Call fakeCall = new Call(0, null, this);
|
||||||
fakeCall.setResponse(ByteBuffer.wrap(
|
fakeCall.setResponse(ByteBuffer.wrap(
|
||||||
|
|
Loading…
Reference in New Issue