From b2edecfdde9b006ea78af307261a936c0ed7d5ec Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Sat, 12 May 2012 01:33:05 +0000 Subject: [PATCH] svn merge -c 1329319 from trunk for HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1337428 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../dev-support/findbugsExcludeFile.xml | 5 + .../java/org/apache/hadoop/ipc/Client.java | 69 +++++----- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 11 +- ...rotocolMetaInfoServerSideTranslatorPB.java | 7 +- .../main/java/org/apache/hadoop/ipc/RPC.java | 19 ++- .../org/apache/hadoop/ipc/RpcClientUtil.java | 3 +- .../apache/hadoop/ipc/RpcPayloadHeader.java | 118 ------------------ .../java/org/apache/hadoop/ipc/Server.java | 50 ++++---- .../apache/hadoop/ipc/WritableRpcEngine.java | 15 ++- .../org/apache/hadoop/util/ProtoUtil.java | 28 +++++ .../src/main/proto/RpcPayloadHeader.proto | 58 +++++++++ .../java/org/apache/hadoop/ipc/TestIPC.java | 3 +- .../hadoop/ipc/TestIPCServerResponder.java | 3 +- .../ipc/TestMultipleProtocolServer.java | 9 +- .../apache/hadoop/ipc/TestProtoBufRpc.java | 3 +- .../hadoop/ipc/TestRPCCompatibility.java | 19 ++- 17 files changed, 203 insertions(+), 219 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java create mode 100644 hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 8043d59f34a..652d9a28e04 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -182,6 +182,8 @@ Release 2.0.0 - UNRELEASED HADOOP-8113. Correction to BUILDING.txt: HDFS needs ProtocolBuffer, too (not just MapReduce). Contributed by Eugene Koontz. + HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index 855b0284537..44092c00146 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -281,9 +281,14 @@ + + + + + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index cb999f3c41c..083141311b5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -50,8 +50,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ipc.RpcPayloadHeader.*; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -163,10 +164,10 @@ public class Client { final Writable rpcRequest; // the serialized rpc request - RpcPayload Writable rpcResponse; // null if rpc has error IOException error; // exception, null if success - final RpcKind rpcKind; // Rpc EngineKind + final RPC.RpcKind rpcKind; // Rpc EngineKind boolean done; // true when call is done - protected Call(RpcKind rpcKind, Writable param) { + protected Call(RPC.RpcKind rpcKind, Writable param) { this.rpcKind = rpcKind; this.rpcRequest = param; synchronized (Client.this) { @@ -613,7 +614,7 @@ public class Client { this.in = new DataInputStream(new BufferedInputStream(inStream)); } this.out = new DataOutputStream(new BufferedOutputStream(outStream)); - writeHeader(); + writeConnectionContext(); // update last activity time touch(); @@ -704,16 +705,17 @@ public class Client { out.flush(); } - /* Write the protocol header for each connection + /* Write the connection context header for each connection * Out is not synchronized because only the first thread does this. */ - private void writeHeader() throws IOException { + private void writeConnectionContext() throws IOException { // Write out the ConnectionHeader DataOutputBuffer buf = new DataOutputBuffer(); connectionContext.writeTo(buf); // Write out the payload length int bufLen = buf.getLength(); + out.writeInt(bufLen); out.write(buf.getData(), 0, bufLen); } @@ -806,21 +808,22 @@ public class Client { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); - //for serializing the - //data to be written + // Serializing the data to be written. + // Format: + // 0) Length of rest below (1 + 2) + // 1) PayloadHeader - is serialized Delimited hence contains length + // 2) the Payload - the RpcRequest + // d = new DataOutputBuffer(); - d.writeInt(0); // placeholder for data length - RpcPayloadHeader header = new RpcPayloadHeader( - call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id); - header.write(d); + RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader( + call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id); + header.writeDelimitedTo(d); call.rpcRequest.write(d); byte[] data = d.getData(); - int dataLength = d.getLength() - 4; - data[0] = (byte)((dataLength >>> 24) & 0xff); - data[1] = (byte)((dataLength >>> 16) & 0xff); - data[2] = (byte)((dataLength >>> 8) & 0xff); - data[3] = (byte)(dataLength & 0xff); - out.write(data, 0, dataLength + 4);//write the data + + int totalLength = d.getLength(); + out.writeInt(totalLength); // Total Length + out.write(data, 0, totalLength);//PayloadHeader + RpcRequest out.flush(); } } catch(IOException e) { @@ -937,7 +940,7 @@ public class Client { private int index; public ParallelCall(Writable param, ParallelResults results, int index) { - super(RpcKind.RPC_WRITABLE, param); + super(RPC.RpcKind.RPC_WRITABLE, param); this.results = results; this.index = index; } @@ -1022,22 +1025,22 @@ public class Client { } /** - * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)} + * Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)} * for RPC_BUILTIN */ public Writable call(Writable param, InetSocketAddress address) throws InterruptedException, IOException { - return call(RpcKind.RPC_BUILTIN, param, address); + return call(RPC.RpcKind.RPC_BUILTIN, param, address); } /** Make a call, passing param, to the IPC server running at * address, returning the value. Throws exceptions if there are * network problems or if the remote code threw an exception. - * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, + * @deprecated Use {@link #call(RPC.RpcKind, Writable, * ConnectionId)} instead */ @Deprecated - public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address) + public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address) throws InterruptedException, IOException { return call(rpcKind, param, address, null); } @@ -1047,11 +1050,11 @@ public class Client { * the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, + * @deprecated Use {@link #call(RPC.RpcKind, Writable, * ConnectionId)} instead */ @Deprecated - public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, + public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, UserGroupInformation ticket) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0, @@ -1065,11 +1068,11 @@ public class Client { * timeout, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, + * @deprecated Use {@link #call(RPC.RpcKind, Writable, * ConnectionId)} instead */ @Deprecated - public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, + public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout) throws InterruptedException, IOException { @@ -1080,7 +1083,7 @@ public class Client { /** - * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress, + * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress, * Class, UserGroupInformation, int, Configuration)} * except that rpcKind is writable. */ @@ -1090,7 +1093,7 @@ public class Client { throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, conf); - return call(RpcKind.RPC_BUILTIN, param, remoteId); + return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); } /** @@ -1101,7 +1104,7 @@ public class Client { * value. Throws exceptions if there are network problems or if the remote * code threw an exception. */ - public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, + public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout, Configuration conf) throws InterruptedException, IOException { @@ -1111,12 +1114,12 @@ public class Client { } /** - * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)} + * Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)} * except the rpcKind is RPC_BUILTIN */ public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { - return call(RpcKind.RPC_BUILTIN, param, remoteId); + return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); } /** @@ -1130,7 +1133,7 @@ public class Client { * Throws exceptions if there are network problems or if the remote code * threw an exception. */ - public Writable call(RpcKind rpcKind, Writable rpcRequest, + public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(rpcKind, rpcRequest); Connection connection = getConnection(remoteId, call); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index befc8f70e03..2d3f91e5e4c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -38,7 +38,6 @@ import org.apache.hadoop.io.DataOutputOutputStream; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; import org.apache.hadoop.security.UserGroupInformation; @@ -61,7 +60,7 @@ public class ProtobufRpcEngine implements RpcEngine { static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( - RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class, new Server.ProtoBufRpcInvoker()); } @@ -182,7 +181,7 @@ public class ProtobufRpcEngine implements RpcEngine { HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args); RpcResponseWritable val = null; try { - val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER, + val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, new RpcRequestWritable(rpcRequest), remoteId); } catch (Throwable e) { throw new ServiceException(e); @@ -351,7 +350,7 @@ public class ProtobufRpcEngine implements RpcEngine { numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl .getClass().getName()), secretManager, portRangeConfig); this.verbose = verbose; - registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, + registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, protocolImpl); } @@ -363,10 +362,10 @@ public class ProtobufRpcEngine implements RpcEngine { String protoName, long version) throws IOException { ProtoNameVer pv = new ProtoNameVer(protoName, version); ProtoClassProtoImpl impl = - server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv); + server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv); if (impl == null) { // no match for Protocol AND Version VerProtocolImpl highest = - server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, + server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protoName); if (highest == null) { throw new IOException("Unknown protocol: " + protoName); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java index aaf71f8a4e7..d9d80a84d86 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto; @@ -49,7 +48,7 @@ public class ProtocolMetaInfoServerSideTranslatorPB implements String protocol = request.getProtocol(); GetProtocolVersionsResponseProto.Builder builder = GetProtocolVersionsResponseProto.newBuilder(); - for (RpcKind r : RpcKind.values()) { + for (RPC.RpcKind r : RPC.RpcKind.values()) { long[] versions; try { versions = getProtocolVersionForRpcKind(r, protocol); @@ -78,7 +77,7 @@ public class ProtocolMetaInfoServerSideTranslatorPB implements String rpcKind = request.getRpcKind(); long[] versions; try { - versions = getProtocolVersionForRpcKind(RpcKind.valueOf(rpcKind), + versions = getProtocolVersionForRpcKind(RPC.RpcKind.valueOf(rpcKind), protocol); } catch (ClassNotFoundException e1) { throw new ServiceException(e1); @@ -104,7 +103,7 @@ public class ProtocolMetaInfoServerSideTranslatorPB implements return builder.build(); } - private long[] getProtocolVersionForRpcKind(RpcKind rpcKind, + private long[] getProtocolVersionForRpcKind(RPC.RpcKind rpcKind, String protocol) throws ClassNotFoundException { Class protocolClass = Class.forName(protocol); String protocolName = RPC.getProtocolName(protocolClass); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index d0f268ec5d2..56fbd7d5a1b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -42,7 +42,6 @@ import org.apache.commons.logging.*; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.Client.ConnectionId; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; @@ -73,6 +72,18 @@ import com.google.protobuf.BlockingService; * the protocol instance is transmitted. */ public class RPC { + public enum RpcKind { + RPC_BUILTIN ((short) 1), // Used for built in calls by tests + RPC_WRITABLE ((short) 2), // Use WritableRpcEngine + RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine + final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size + private static final short FIRST_INDEX = RPC_BUILTIN.value; + public final short value; //TODO make it private + + RpcKind(short val) { + this.value = val; + } + } interface RpcInvoker { /** @@ -777,7 +788,7 @@ public class RPC { ArrayList> protocolImplMapArray = new ArrayList>(RpcKind.MAX_INDEX); - Map getProtocolImplMap(RpcKind rpcKind) { + Map getProtocolImplMap(RPC.RpcKind rpcKind) { if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds for (int i=0; i <= RpcKind.MAX_INDEX; ++i) { protocolImplMapArray.add( @@ -821,7 +832,7 @@ public class RPC { @SuppressWarnings("unused") // will be useful later. - VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind, + VerProtocolImpl[] getSupportedProtocolVersions(RPC.RpcKind rpcKind, String protocolName) { VerProtocolImpl[] resultk = new VerProtocolImpl[getProtocolImplMap(rpcKind).size()]; @@ -900,7 +911,7 @@ public class RPC { } @Override - public Writable call(RpcKind rpcKind, String protocol, + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java index cdbc034ea2f..2623f9ede5c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java @@ -27,7 +27,6 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; @@ -107,7 +106,7 @@ public class RpcClientUtil { * @throws IOException */ public static boolean isMethodSupported(Object rpcProxy, Class protocol, - RpcKind rpcKind, long version, String methodName) throws IOException { + RPC.RpcKind rpcKind, long version, String methodName) throws IOException { InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy); Map versionMap = getVersionSignatureMap( serverAddress, protocol.getName(), rpcKind.toString()); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java deleted file mode 100644 index 6e97159fb46..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java +++ /dev/null @@ -1,118 +0,0 @@ -package org.apache.hadoop.ipc; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Writable; - -/** - * This is the rpc payload header. It is sent with every rpc call - *
- * The format of RPC call is as follows:
- * +---------------------------------------------------+
- * |  Rpc length in bytes (header + payload length)    |
- * +---------------------------------------------------+
- * |      Rpc Header       |       Rpc Payload         |
- * +---------------------------------------------------+
- * 
- * The format of Rpc Header is:
- * +----------------------------------+
- * |  RpcKind (1 bytes)               |      
- * +----------------------------------+
- * |  RpcPayloadOperation (1 bytes)   |      
- * +----------------------------------+
- * |  Call ID (4 bytes)               |      
- * +----------------------------------+
- * 
- * {@link RpcKind} determines the type of serialization used for Rpc Payload.
- * 
- *

- * Note this header does NOT have its own version number, - * it used the version number from the connection header. - */ -public class RpcPayloadHeader implements Writable { - public enum RpcPayloadOperation { - RPC_FINAL_PAYLOAD ((short)1), - RPC_CONTINUATION_PAYLOAD ((short)2), // not implemented yet - RPC_CLOSE_CONNECTION ((short)3); // close the rpc connection - - private final short code; - private static final short FIRST_INDEX = RPC_FINAL_PAYLOAD.code; - RpcPayloadOperation(short val) { - this.code = val; - } - - public void write(DataOutput out) throws IOException { - out.writeByte(code); - } - - static RpcPayloadOperation readFields(DataInput in) throws IOException { - short inValue = in.readByte(); - return RpcPayloadOperation.values()[inValue - FIRST_INDEX]; - } - } - - public enum RpcKind { - RPC_BUILTIN ((short) 1), // Used for built in calls by tests - RPC_WRITABLE ((short) 2), // Use WritableRpcEngine - RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine - final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size - private static final short FIRST_INDEX = RPC_BUILTIN.value; - private final short value; - - RpcKind(short val) { - this.value = val; - } - - public void write(DataOutput out) throws IOException { - out.writeByte(value); - } - - static RpcKind readFields(DataInput in) throws IOException { - short inValue = in.readByte(); - return RpcKind.values()[inValue - FIRST_INDEX]; - } - } - - private RpcKind kind; - private RpcPayloadOperation operation; - private int callId; - - public RpcPayloadHeader() { - kind = RpcKind.RPC_WRITABLE; - operation = RpcPayloadOperation.RPC_CLOSE_CONNECTION; - } - - public RpcPayloadHeader(RpcKind kind, RpcPayloadOperation op, int callId) { - this.kind = kind; - this.operation = op; - this.callId = callId; - } - - int getCallId() { - return callId; - } - - RpcKind getkind() { - return kind; - } - - RpcPayloadOperation getOperation() { - return operation; - } - - @Override - public void write(DataOutput out) throws IOException { - kind.write(out); - operation.write(out); - out.writeInt(callId); - } - - @Override - public void readFields(DataInput in) throws IOException { - kind = RpcKind.readFields(in); - operation = RpcPayloadOperation.readFields(in); - this.callId = in.readInt(); - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 8628bb0c485..24c2f7beaa7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -72,11 +72,10 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation; import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SaslRpcServer; @@ -170,8 +169,8 @@ public abstract class Server { this.rpcRequestWrapperClass = rpcRequestWrapperClass; } } - static Map rpcKindMap = new - HashMap(4); + static Map rpcKindMap = new + HashMap(4); @@ -185,7 +184,7 @@ public abstract class Server { * @param rpcInvoker - use to process the calls on SS. */ - public static void registerProtocolEngine(RpcKind rpcKind, + public static void registerProtocolEngine(RPC.RpcKind rpcKind, Class rpcRequestWrapperClass, RpcInvoker rpcInvoker) { RpcKindMapValue old = @@ -201,14 +200,14 @@ public abstract class Server { } public Class getRpcRequestWrapper( - RpcKind rpcKind) { + RpcKindProto rpcKind) { if (rpcRequestClass != null) return rpcRequestClass; - RpcKindMapValue val = rpcKindMap.get(rpcKind); + RpcKindMapValue val = rpcKindMap.get(ProtoUtil.convert(rpcKind)); return (val == null) ? null : val.rpcRequestWrapperClass; } - public static RpcInvoker getRpcInvoker(RpcKind rpcKind) { + public static RpcInvoker getRpcInvoker(RPC.RpcKind rpcKind) { RpcKindMapValue val = rpcKindMap.get(rpcKind); return (val == null) ? null : val.rpcInvoker; } @@ -403,12 +402,12 @@ public abstract class Server { private long timestamp; // time received when response is null // time served when response is not null private ByteBuffer rpcResponse; // the response for this call - private final RpcKind rpcKind; + private final RPC.RpcKind rpcKind; public Call(int id, Writable param, Connection connection) { - this( id, param, connection, RpcKind.RPC_BUILTIN ); + this( id, param, connection, RPC.RpcKind.RPC_BUILTIN ); } - public Call(int id, Writable param, Connection connection, RpcKind kind) { + public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) { this.callId = id; this.rpcRequest = param; this.connection = connection; @@ -1366,7 +1365,6 @@ public abstract class Server { if (data == null) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); - if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) { // covers the !useSasl too dataLengthBuffer.clear(); @@ -1555,22 +1553,27 @@ public abstract class Server { private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); - RpcPayloadHeader header = new RpcPayloadHeader(); - header.readFields(dis); // Read the RpcPayload header + RpcPayloadHeaderProto header = RpcPayloadHeaderProto.parseDelimitedFrom(dis); if (LOG.isDebugEnabled()) LOG.debug(" got #" + header.getCallId()); - if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) { + if (!header.hasRpcOp()) { + throw new IOException(" IPC Server: No rpc op in rpcPayloadHeader"); + } + if (header.getRpcOp() != RpcPayloadOperationProto.RPC_FINAL_PAYLOAD) { throw new IOException("IPC Server does not implement operation" + - header.getOperation()); + header.getRpcOp()); } // If we know the rpc kind, get its class so that we can deserialize // (Note it would make more sense to have the handler deserialize but // we continue with this original design. + if (!header.hasRpcKind()) { + throw new IOException(" IPC Server: No rpc kind in rpcPayloadHeader"); + } Class rpcRequestClass = - getRpcRequestWrapper(header.getkind()); + getRpcRequestWrapper(header.getRpcKind()); if (rpcRequestClass == null) { - LOG.warn("Unknown rpc kind " + header.getkind() + + LOG.warn("Unknown rpc kind " + header.getRpcKind() + " from client " + getHostAddress()); final Call readParamsFailedCall = new Call(header.getCallId(), null, this); @@ -1578,7 +1581,7 @@ public abstract class Server { setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, IOException.class.getName(), - "Unknown rpc kind " + header.getkind()); + "Unknown rpc kind " + header.getRpcKind()); responder.doRespond(readParamsFailedCall); return; } @@ -1589,7 +1592,7 @@ public abstract class Server { } catch (Throwable t) { LOG.warn("Unable to read call parameters for client " + getHostAddress() + "on connection protocol " + - this.protocolName + " for rpcKind " + header.getkind(), t); + this.protocolName + " for rpcKind " + header.getRpcKind(), t); final Call readParamsFailedCall = new Call(header.getCallId(), null, this); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); @@ -1601,7 +1604,8 @@ public abstract class Server { return; } - Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind()); + Call call = new Call(header.getCallId(), rpcRequest, this, + ProtoUtil.convert(header.getRpcKind())); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } @@ -1991,11 +1995,11 @@ public abstract class Server { */ @Deprecated public Writable call(Writable param, long receiveTime) throws Exception { - return call(RpcKind.RPC_BUILTIN, null, param, receiveTime); + return call(RPC.RpcKind.RPC_BUILTIN, null, param, receiveTime); } /** Called for each call. */ - public abstract Writable call(RpcKind rpcKind, String protocol, + public abstract Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws Exception; /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 9104f6ff834..6fd800cad6e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -33,7 +33,6 @@ import org.apache.commons.logging.*; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -75,7 +74,7 @@ public class WritableRpcEngine implements RpcEngine { * Register the rpcRequest deserializer for WritableRpcEngine */ private static synchronized void initialize() { - org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE, + org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE, Invocation.class, new Server.WritableRpcInvoker()); isInitialized = true; } @@ -223,7 +222,7 @@ public class WritableRpcEngine implements RpcEngine { } ObjectWritable value = (ObjectWritable) - client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); + client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); @@ -412,12 +411,12 @@ public class WritableRpcEngine implements RpcEngine { protocolImpl.getClass()); } // register protocol class and its super interfaces - registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); + registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); protocols = RPC.getProtocolInterfaces(protocolClass); } for (Class p : protocols) { if (!p.equals(VersionedProtocol.class)) { - registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl); + registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl); } } @@ -461,7 +460,7 @@ public class WritableRpcEngine implements RpcEngine { // registered directly. // Send the call to the highest protocol version VerProtocolImpl highest = server.getHighestSupportedProtocol( - RpcKind.RPC_WRITABLE, protocolName); + RPC.RpcKind.RPC_WRITABLE, protocolName); if (highest == null) { throw new IOException("Unknown protocol: " + protocolName); } @@ -473,10 +472,10 @@ public class WritableRpcEngine implements RpcEngine { ProtoNameVer pv = new ProtoNameVer(call.declaringClassProtocolName, clientVersion); protocolImpl = - server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv); + server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv); if (protocolImpl == null) { // no match for Protocol AND Version VerProtocolImpl highest = - server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, + server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, protoName); if (highest == null) { throw new IOException("Unknown protocol: " + protoName); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 3ee306b6290..0618f0631c8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -21,8 +21,10 @@ package org.apache.hadoop.util; import java.io.DataInput; import java.io.IOException; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation; @@ -135,4 +137,30 @@ public abstract class ProtoUtil { } return ugi; } + + static RpcKindProto convert(RPC.RpcKind kind) { + switch (kind) { + case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN; + case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE; + case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER; + } + return null; + } + + + public static RPC.RpcKind convert( RpcKindProto kind) { + switch (kind) { + case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN; + case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE; + case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER; + } + return null; + } + + public static RpcPayloadHeaderProto makeRpcPayloadHeader(RPC.RpcKind rpcKind, + RpcPayloadOperationProto operation, int callId) { + RpcPayloadHeaderProto.Builder result = RpcPayloadHeaderProto.newBuilder(); + result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId); + return result.build(); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto new file mode 100644 index 00000000000..42dea3bde3e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.ipc.protobuf"; +option java_outer_classname = "RpcPayloadHeaderProtos"; +option java_generate_equals_and_hash = true; + + +/** + * This is the rpc payload header. It is sent with every rpc call. + * + * The format of RPC call is as follows: + * +-----------------------------------------------------+ + * | Rpc length in bytes | + * +-----------------------------------------------------+ + * | RpcPayloadHeader - serialized delimited ie has len | + * +-----------------------------------------------------+ + * | RpcRequest Payload | + * +-----------------------------------------------------+ + * + */ + + + +/** + * RpcKind determine the rpcEngine and the serialization of the rpc payload + */ +enum RpcKindProto { + RPC_BUILTIN = 0; // Used for built in calls by tests + RPC_WRITABLE = 1; // Use WritableRpcEngine + RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine +} + +enum RpcPayloadOperationProto { + RPC_FINAL_PAYLOAD = 0; // The final payload + RPC_CONTINUATION_PAYLOAD = 1; // not implemented yet + RPC_CLOSE_CONNECTION = 2; // close the rpc connection +} + +message RpcPayloadHeaderProto { // the header for the RpcRequest + optional RpcKindProto rpcKind = 1; + optional RpcPayloadOperationProto rpcOp = 2; + optional uint32 callId = 3; // each rpc has a callId that is also used in response +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index efb2dc1126d..5797bb524bc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -25,7 +25,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.net.NetUtils; @@ -99,7 +98,7 @@ public class TestIPC { } @Override - public Writable call(RpcKind rpcKind, String protocol, Writable param, + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws IOException { if (sleep) { // sleep a bit diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java index 5675cbfddf9..5f5cc1bcd32 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java @@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; /** @@ -73,7 +72,7 @@ public class TestIPCServerResponder extends TestCase { } @Override - public Writable call(RpcKind rpcKind, String protocol, Writable param, + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws IOException { if (sleep) { try { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java index f5acd93eb2f..0446b425087 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java @@ -23,7 +23,6 @@ import java.net.InetSocketAddress; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl; import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; @@ -178,9 +177,9 @@ public class TestMultipleProtocolServer { // create a server with two handlers server = RPC.getServer(Foo0.class, new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); - server.addProtocol(RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); - server.addProtocol(RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); // Add Protobuf server @@ -189,7 +188,7 @@ public class TestMultipleProtocolServer { new PBServerImpl(); BlockingService service = TestProtobufRpcProto .newReflectiveBlockingService(pbServerImpl); - server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, service); server.start(); addr = NetUtils.getConnectAddress(server); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index 3b9140afc4c..9e7b2694411 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; @@ -122,7 +121,7 @@ public class TestProtoBufRpc { BlockingService service2 = TestProtobufRpc2Proto .newReflectiveBlockingService(server2Impl); - server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, service2); server.start(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index aca33ef25b7..50ae210ea9e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -31,7 +31,6 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; @@ -134,7 +133,7 @@ public class TestRPCCompatibility { TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -201,7 +200,7 @@ System.out.println("echo int is NOT supported"); TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -222,7 +221,7 @@ System.out.println("echo int is NOT supported"); TestImpl2 impl = new TestImpl2(); server = RPC.getServer(TestProtocol2.class, impl, ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -316,11 +315,11 @@ System.out.println("echo int is NOT supported"); TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, TestProtocol2.versionID, addr, conf); boolean supported = RpcClientUtil.isMethodSupported(proxy, - TestProtocol2.class, RpcKind.RPC_WRITABLE, + TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE, RPC.getProtocolVersion(TestProtocol2.class), "echo"); Assert.assertTrue(supported); supported = RpcClientUtil.isMethodSupported(proxy, - TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER, + TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(TestProtocol2.class), "echo"); Assert.assertFalse(supported); } @@ -334,7 +333,7 @@ System.out.println("echo int is NOT supported"); TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); ProtocolMetaInfoServerSideTranslatorPB xlator = @@ -343,13 +342,13 @@ System.out.println("echo int is NOT supported"); GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( null, createGetProtocolSigRequestProto(TestProtocol1.class, - RpcKind.RPC_PROTOCOL_BUFFER)); + RPC.RpcKind.RPC_PROTOCOL_BUFFER)); //No signatures should be found Assert.assertEquals(0, resp.getProtocolSignatureCount()); resp = xlator.getProtocolSignature( null, createGetProtocolSigRequestProto(TestProtocol1.class, - RpcKind.RPC_WRITABLE)); + RPC.RpcKind.RPC_WRITABLE)); Assert.assertEquals(1, resp.getProtocolSignatureCount()); ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); @@ -366,7 +365,7 @@ System.out.println("echo int is NOT supported"); } private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( - Class protocol, RpcKind rpcKind) { + Class protocol, RPC.RpcKind rpcKind) { GetProtocolSignatureRequestProto.Builder builder = GetProtocolSignatureRequestProto.newBuilder(); builder.setProtocol(protocol.getName());