HDFS-15790. Make ProtobufRpcEngineProtos and ProtobufRpcEngineProtos2 Co-Exist (#2767)

This commit is contained in:
Vinayakumar B 2021-05-24 15:15:39 +05:30 committed by GitHub
parent 6bb0892f6e
commit 2bbeae3240
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 13699 additions and 176 deletions

View File

@ -413,7 +413,12 @@
</execution>
<execution>
<id>src-test-compile-protoc</id>
<configuration><skip>false</skip></configuration>
<configuration>
<skip>false</skip>
<excludes>
<exclude>*legacy.proto</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>
@ -434,6 +439,10 @@
<id>replace-generated-test-sources</id>
<configuration>
<skip>false</skip>
<excludes>
<exclude>**/TestProtosLegacy.java</exclude>
<exclude>**/TestRpcServiceProtosLegacy.java</exclude>
</excludes>
</configuration>
</execution>
<execution>
@ -446,6 +455,7 @@
<exclude>**/RpcWritable.java</exclude>
<exclude>**/ProtobufRpcEngineCallback.java</exclude>
<exclude>**/ProtobufRpcEngine.java</exclude>
<exclude>**/ProtobufRpcEngine2.java</exclude>
<exclude>**/ProtobufRpcEngineProtos.java</exclude>
</excludes>
</configuration>
@ -454,6 +464,9 @@
<id>replace-test-sources</id>
<configuration>
<skip>false</skip>
<excludes>
<exclude>**/TestProtoBufRpc.java</exclude>
</excludes>
</configuration>
</execution>
</executions>
@ -1077,6 +1090,18 @@
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source-legacy-protobuf</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/src/test/arm-java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
@ -1118,6 +1143,28 @@
</includes>
</configuration>
</execution>
<execution>
<id>src-test-compile-protoc-legacy</id>
<phase>generate-test-sources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<skip>false</skip>
<!--Generating with old protobuf version for backward compatibility-->
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<includeDependenciesInDescriptorSet>false</includeDependenciesInDescriptorSet>
<protoSourceRoot>${basedir}/src/test/proto</protoSourceRoot>
<outputDirectory>${project.build.directory}/generated-test-sources/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<includes>
<include>test_legacy.proto</include>
<include>test_rpc_service_legacy.proto</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ipc;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
@ -31,16 +30,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,9 +66,8 @@ public class ProtobufRpcEngine implements RpcEngine {
ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for ProtobufRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
new Server.ProtoBufRpcInvoker());
//These will be used in server side, which is always ProtobufRpcEngine2
ProtobufRpcEngine2.registerProtocolEngine();
}
private static final ClientCache CLIENTS = new ClientCache();
@ -352,8 +349,6 @@ public class ProtobufRpcEngine implements RpcEngine {
return CLIENTS.getClient(conf, SocketFactory.getDefault(),
RpcWritable.Buffer.class);
}
@Override
public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
@ -366,25 +361,17 @@ public class ProtobufRpcEngine implements RpcEngine {
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig, alignmentContext);
}
public static class Server extends RPC.Server {
/**
* Server implementation is always ProtobufRpcEngine2 based implementation,
* supports backward compatibility for protobuf 2.5 based implementations,
* which uses non-shaded protobuf classes.
*/
public static class Server extends ProtobufRpcEngine2.Server {
static final ThreadLocal<ProtobufRpcEngineCallback> currentCallback =
new ThreadLocal<>();
static final ThreadLocal<CallInfo> currentCallInfo = new ThreadLocal<>();
private static final RpcInvoker RPC_INVOKER = new ProtoBufRpcInvoker();
static class CallInfo {
private final RPC.Server server;
private final String methodName;
public CallInfo(RPC.Server server, String methodName) {
this.server = server;
this.methodName = methodName;
}
}
static class ProtobufRpcEngineCallbackImpl
implements ProtobufRpcEngineCallback {
@ -394,9 +381,9 @@ public class ProtobufRpcEngine implements RpcEngine {
private final long setupTime;
public ProtobufRpcEngineCallbackImpl() {
this.server = currentCallInfo.get().server;
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = currentCallInfo.get().methodName;
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
}
@ -443,144 +430,58 @@ public class ProtobufRpcEngine implements RpcEngine {
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
super(bindAddress, port, null, numHandlers,
numReaders, queueSizePerHandler, conf,
serverNameFromClass(protocolImpl.getClass()), secretManager,
portRangeConfig);
setAlignmentContext(alignmentContext);
this.verbose = verbose;
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);
}
@Override
protected RpcInvoker getServerRpcInvoker(RpcKind rpcKind) {
if (rpcKind == RpcKind.RPC_PROTOCOL_BUFFER) {
return RPC_INVOKER;
}
return super.getServerRpcInvoker(rpcKind);
super(protocolClass, protocolImpl, conf, bindAddress, port, numHandlers,
numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig, alignmentContext);
}
/**
* Protobuf invoker for {@link RpcInvoker}
* This implementation is same as
* ProtobufRpcEngine2.Server.ProtobufInvoker#call(..)
* except this implementation uses non-shaded protobuf classes from legacy
* protobuf version (default 2.5.0).
*/
static class ProtoBufRpcInvoker implements RpcInvoker {
private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
String protoName, long clientVersion) throws RpcServerException {
ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
ProtoClassProtoImpl impl =
server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (impl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
protoName);
if (highest == null) {
throw new RpcNoSuchProtocolException(
"Unknown protocol: " + protoName);
}
// protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
return impl;
static RpcWritable processCall(RPC.Server server,
String connectionProtocolName, RpcWritable.Buffer request,
String methodName, ProtoClassProtoImpl protocolImpl) throws Exception {
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on "
+ connectionProtocolName + " protocol.";
LOG.warn(msg);
throw new RpcNoSuchMethodException(msg);
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = request.getValue(prototype);
@Override
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the response.
* See {@link HadoopRpcResponseProto}
*
* In this method there three types of exceptions possible and they are
* returned in response as follows.
* <ol>
* <li> Exceptions encountered in this method that are returned
* as {@link RpcServerException} </li>
* <li> Exceptions thrown by the service is wrapped in ServiceException.
* In that this method returns in response the exception thrown by the
* service.</li>
* <li> Other exceptions thrown by the service. They are returned as
* it is.</li>
* </ol>
*/
public Writable call(RPC.Server server, String connectionProtocolName,
Writable writableRequest, long receiveTime) throws Exception {
RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
RequestHeaderProto rpcRequest = request.getRequestHeader();
String methodName = rpcRequest.getMethodName();
/**
* RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy.
* The rpcProxy's has a declared protocol name that is
* sent form client to server at connection time.
*
* Each Rpc call also sends a protocol name
* (called declaringClassprotocolName). This name is usually the same
* as the connection protocol name except in some cases.
* For example metaProtocols such ProtocolInfoProto which get info
* about the protocol reuse the connection but need to indicate that
* the actual protocol is different (i.e. the protocol is
* ProtocolInfoProto) since they reuse the connection; in this case
* the declaringClassProtocolName field is set to the ProtocolInfoProto.
*/
String declaringClassProtoName =
rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
return call(server, connectionProtocolName, request, receiveTime,
methodName, declaringClassProtoName, clientVersion);
}
protected Writable call(RPC.Server server, String connectionProtocolName,
RpcWritable.Buffer request, long receiveTime, String methodName,
String declaringClassProtoName, long clientVersion) throws Exception {
if (server.verbose)
LOG.info("Call: connectionProtocolName=" + connectionProtocolName +
", method=" + methodName);
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
declaringClassProtoName, clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on "
+ connectionProtocolName + " protocol.";
LOG.warn(msg);
throw new RpcNoSuchMethodException(msg);
Message result;
Call currentCall = Server.getCurCall().get();
try {
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
CURRENT_CALL_INFO.set(new CallInfo(server, methodName));
currentCall.setDetailedMetricsName(methodName);
result = service.callBlockingMethod(methodDescriptor, null, param);
// Check if this needs to be a deferred response,
// by checking the ThreadLocal callback being set
if (currentCallback.get() != null) {
currentCall.deferResponse();
currentCallback.set(null);
return null;
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = request.getValue(prototype);
Message result;
Call currentCall = Server.getCurCall().get();
try {
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
currentCallInfo.set(new CallInfo(server, methodName));
currentCall.setDetailedMetricsName(methodName);
result = service.callBlockingMethod(methodDescriptor, null, param);
// Check if this needs to be a deferred response,
// by checking the ThreadLocal callback being set
if (currentCallback.get() != null) {
currentCall.deferResponse();
currentCallback.set(null);
return null;
}
} catch (ServiceException e) {
Exception exception = (Exception) e.getCause();
currentCall.setDetailedMetricsName(
exception.getClass().getSimpleName());
throw (Exception) e.getCause();
} catch (Exception e) {
currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
throw e;
} finally {
currentCallInfo.set(null);
}
return RpcWritable.wrap(result);
} catch (ServiceException e) {
Exception exception = (Exception) e.getCause();
currentCall
.setDetailedMetricsName(exception.getClass().getSimpleName());
throw (Exception) e.getCause();
} catch (Exception e) {
currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
throw e;
} finally {
CURRENT_CALL_INFO.set(null);
}
return RpcWritable.wrap(result);
}
}

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.ipc;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.*;
import org.apache.hadoop.thirdparty.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -33,6 +30,12 @@ import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProt
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.thirdparty.protobuf.Message;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.tracing.Tracer;
@ -61,9 +64,16 @@ public class ProtobufRpcEngine2 implements RpcEngine {
ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for ProtobufRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
new Server.ProtoBufRpcInvoker());
registerProtocolEngine();
}
static void registerProtocolEngine() {
if (Server.getRpcInvoker(RPC.RpcKind.RPC_PROTOCOL_BUFFER) == null) {
org.apache.hadoop.ipc.Server
.registerProtocolEngine(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
ProtobufRpcEngine2.RpcProtobufRequest.class,
new Server.ProtoBufRpcInvoker());
}
}
private static final ClientCache CLIENTS = new ClientCache();
@ -383,6 +393,14 @@ public class ProtobufRpcEngine2 implements RpcEngine {
this.server = server;
this.methodName = methodName;
}
public RPC.Server getServer() {
return server;
}
public String getMethodName() {
return methodName;
}
}
static class ProtobufRpcEngineCallbackImpl
@ -394,9 +412,9 @@ public class ProtobufRpcEngine2 implements RpcEngine {
private final long setupTime;
ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().server;
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().methodName;
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
}
@ -417,7 +435,7 @@ public class ProtobufRpcEngine2 implements RpcEngine {
}
@InterfaceStability.Unstable
public static ProtobufRpcEngineCallback2 registerForDeferredResponse() {
public static ProtobufRpcEngineCallback2 registerForDeferredResponse2() {
ProtobufRpcEngineCallback2 callback = new ProtobufRpcEngineCallbackImpl();
CURRENT_CALLBACK.set(callback);
return callback;
@ -453,6 +471,17 @@ public class ProtobufRpcEngine2 implements RpcEngine {
protocolImpl);
}
//Use the latest protobuf rpc invoker itself as that is backward compatible.
private static final RpcInvoker RPC_INVOKER = new ProtoBufRpcInvoker();
@Override
protected RpcInvoker getServerRpcInvoker(RPC.RpcKind rpcKind) {
if (rpcKind == RPC.RpcKind.RPC_PROTOCOL_BUFFER) {
return RPC_INVOKER;
}
return super.getServerRpcInvoker(rpcKind);
}
/**
* Protobuf invoker for {@link RpcInvoker}.
*/
@ -524,6 +553,7 @@ public class ProtobufRpcEngine2 implements RpcEngine {
methodName, declaringClassProtoName, clientVersion);
}
@SuppressWarnings("deprecation")
protected Writable call(RPC.Server server, String connectionProtocolName,
RpcWritable.Buffer request, long receiveTime, String methodName,
String declaringClassProtoName, long clientVersion) throws Exception {
@ -534,6 +564,21 @@ public class ProtobufRpcEngine2 implements RpcEngine {
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
declaringClassProtoName, clientVersion);
if (protocolImpl.isShadedPBImpl()) {
return call(server, connectionProtocolName, request, methodName,
protocolImpl);
}
//Legacy protobuf implementation. Handle using legacy (Non-shaded)
// protobuf classes.
return ProtobufRpcEngine.Server
.processCall(server, connectionProtocolName, request, methodName,
protocolImpl);
}
private RpcWritable call(RPC.Server server,
String connectionProtocolName, RpcWritable.Buffer request,
String methodName, ProtoClassProtoImpl protocolImpl)
throws Exception {
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);

View File

@ -937,11 +937,18 @@ public class RPC {
*/
static class ProtoClassProtoImpl {
final Class<?> protocolClass;
final Object protocolImpl;
final Object protocolImpl;
private final boolean shadedPBImpl;
ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
this.protocolClass = protocolClass;
this.protocolImpl = protocolImpl;
this.shadedPBImpl = protocolImpl instanceof BlockingService;
}
public boolean isShadedPBImpl() {
return shadedPBImpl;
}
}
ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray =

View File

@ -17,12 +17,10 @@
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.TestProtos;
@ -30,38 +28,71 @@ import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtosLegacy;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtosLegacy;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
/**
* Test for testing protocol buffer based RPC mechanism.
* This test depends on test.proto definition of types in src/test/proto
* and protobuf service definition from src/test/test_rpc_service.proto
*/
@RunWith(Parameterized.class)
public class TestProtoBufRpc extends TestRpcBase {
private static RPC.Server server;
private final static int SLEEP_DURATION = 1000;
/**
* Test with legacy protobuf implementation in same server.
*/
private boolean testWithLegacy;
/**
* Test with legacy protobuf implementation loaded first while creating the
* RPC server.
*/
private boolean testWithLegacyFirst;
public TestProtoBufRpc(Boolean testWithLegacy, Boolean testWithLegacyFirst) {
this.testWithLegacy = testWithLegacy;
this.testWithLegacyFirst = testWithLegacyFirst;
}
@ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
public interface TestRpcService2 extends
TestProtobufRpc2Proto.BlockingInterface {
}
@ProtocolInfo(protocolName="testProtoLegacy", protocolVersion = 1)
public interface TestRpcService2Legacy
extends TestRpcServiceProtosLegacy.
TestProtobufRpc2Proto.BlockingInterface {
}
public static class PBServer2Impl implements TestRpcService2 {
@Override
@ -88,12 +119,58 @@ public class TestProtoBufRpc extends TestRpcBase {
}
}
public static class PBServer2ImplLegacy implements TestRpcService2Legacy {
@Override
public TestProtosLegacy.EmptyResponseProto ping2(
com.google.protobuf.RpcController unused,
TestProtosLegacy.EmptyRequestProto request)
throws com.google.protobuf.ServiceException {
return TestProtosLegacy.EmptyResponseProto.newBuilder().build();
}
@Override
public TestProtosLegacy.EchoResponseProto echo2(
com.google.protobuf.RpcController unused,
TestProtosLegacy.EchoRequestProto request)
throws com.google.protobuf.ServiceException {
return TestProtosLegacy.EchoResponseProto.newBuilder()
.setMessage(request.getMessage()).build();
}
@Override
public TestProtosLegacy.SleepResponseProto sleep(
com.google.protobuf.RpcController controller,
TestProtosLegacy.SleepRequestProto request)
throws com.google.protobuf.ServiceException {
try {
Thread.sleep(request.getMilliSeconds());
} catch (InterruptedException ex) {
}
return TestProtosLegacy.SleepResponseProto.newBuilder().build();
}
}
@Parameters
public static Collection<Object[]> params() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[] {Boolean.TRUE, Boolean.TRUE });
params.add(new Object[] {Boolean.TRUE, Boolean.FALSE });
params.add(new Object[] {Boolean.FALSE, Boolean.FALSE });
return params;
}
@Before
@SuppressWarnings("deprecation")
public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
// Set RPC engine to protobuf RPC engine
if (testWithLegacy) {
RPC.setProtocolEngine(conf, TestRpcService2Legacy.class,
ProtobufRpcEngine.class);
}
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine2.class);
RPC.setProtocolEngine(conf, TestRpcService2.class,
ProtobufRpcEngine2.class);
@ -103,9 +180,21 @@ public class TestProtoBufRpc extends TestRpcBase {
BlockingService service = TestProtobufRpcProto
.newReflectiveBlockingService(serverImpl);
// Get RPC server for server side implementation
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build();
if (testWithLegacy && testWithLegacyFirst) {
PBServer2ImplLegacy server2ImplLegacy = new PBServer2ImplLegacy();
com.google.protobuf.BlockingService legacyService =
TestRpcServiceProtosLegacy.TestProtobufRpc2Proto
.newReflectiveBlockingService(server2ImplLegacy);
server = new RPC.Builder(conf).setProtocol(TestRpcService2Legacy.class)
.setInstance(legacyService).setBindAddress(ADDRESS).setPort(PORT)
.build();
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
service);
} else {
// Get RPC server for server side implementation
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build();
}
addr = NetUtils.getConnectAddress(server);
// now the second protocol
@ -115,6 +204,16 @@ public class TestProtoBufRpc extends TestRpcBase {
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
service2);
if (testWithLegacy && !testWithLegacyFirst) {
PBServer2ImplLegacy server2ImplLegacy = new PBServer2ImplLegacy();
com.google.protobuf.BlockingService legacyService =
TestRpcServiceProtosLegacy.TestProtobufRpc2Proto
.newReflectiveBlockingService(server2ImplLegacy);
server
.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2Legacy.class,
legacyService);
}
server.start();
}
@ -128,6 +227,10 @@ public class TestProtoBufRpc extends TestRpcBase {
return RPC.getProxy(TestRpcService2.class, 0, addr, conf);
}
private TestRpcService2Legacy getClientLegacy() throws IOException {
return RPC.getProxy(TestRpcService2Legacy.class, 0, addr, conf);
}
@Test (timeout=5000)
public void testProtoBufRpc() throws Exception {
TestRpcService client = getClient(addr, conf);
@ -179,10 +282,39 @@ public class TestProtoBufRpc extends TestRpcBase {
MetricsRecordBuilder rpcDetailedMetrics =
getMetrics(server.getRpcDetailedMetrics().name());
assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics);
if (testWithLegacy) {
testProtobufLegacy();
}
}
private void testProtobufLegacy()
throws IOException, com.google.protobuf.ServiceException {
TestRpcService2Legacy client = getClientLegacy();
// Test ping method
client.ping2(null, TestProtosLegacy.EmptyRequestProto.newBuilder().build());
// Test echo method
TestProtosLegacy.EchoResponseProto echoResponse = client.echo2(null,
TestProtosLegacy.EchoRequestProto.newBuilder().setMessage("hello")
.build());
assertThat(echoResponse.getMessage()).isEqualTo("hello");
// Ensure RPC metrics are updated
MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertCounterGt("RpcQueueTimeNumOps", 0L, rpcMetrics);
assertCounterGt("RpcProcessingTimeNumOps", 0L, rpcMetrics);
MetricsRecordBuilder rpcDetailedMetrics =
getMetrics(server.getRpcDetailedMetrics().name());
assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics);
}
@Test (timeout=5000)
public void testProtoBufRandomException() throws Exception {
//No test with legacy
assumeFalse(testWithLegacy);
TestRpcService client = getClient(addr, conf);
try {
@ -200,6 +332,8 @@ public class TestProtoBufRpc extends TestRpcBase {
@Test(timeout=6000)
public void testExtraLongRpc() throws Exception {
//No test with legacy
assumeFalse(testWithLegacy);
TestRpcService2 client = getClient2();
final String shortString = StringUtils.repeat("X", 4);
// short message goes through
@ -219,6 +353,8 @@ public class TestProtoBufRpc extends TestRpcBase {
@Test(timeout = 12000)
public void testLogSlowRPC() throws IOException, ServiceException,
TimeoutException, InterruptedException {
//No test with legacy
assumeFalse(testWithLegacy);
TestRpcService2 client = getClient2();
// make 10 K fast calls
for (int x = 0; x < 10000; x++) {
@ -244,6 +380,8 @@ public class TestProtoBufRpc extends TestRpcBase {
@Test(timeout = 12000)
public void testEnsureNoLogIfDisabled() throws IOException, ServiceException {
//No test with legacy
assumeFalse(testWithLegacy);
// disable slow RPC logging
server.setLogSlowRPC(false);
TestRpcService2 client = getClient2();

View File

@ -145,7 +145,7 @@ public class TestProtoBufRpcServerHandoff {
ServiceException {
final long startTime = System.currentTimeMillis();
final ProtobufRpcEngineCallback2 callback =
ProtobufRpcEngine2.Server.registerForDeferredResponse();
ProtobufRpcEngine2.Server.registerForDeferredResponse2();
final long sleepTime = request.getSleepTime();
new Thread() {
@Override

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "TestProtosLegacy";
option java_generate_equals_and_hash = true;
package hadoop.common;
message EmptyRequestProto {
}
message EmptyResponseProto {
}
message EchoRequestProto {
required string message = 1;
}
message EchoResponseProto {
required string message = 1;
}
message OptRequestProto {
optional string message = 1;
}
message OptResponseProto {
optional string message = 1;
}
message SleepRequestProto{
required int32 milliSeconds = 1;
}
message SleepResponseProto{
}
message SlowPingRequestProto {
required bool shouldSlow = 1;
}
message EchoRequestProto2 {
repeated string message = 1;
}
message EchoResponseProto2 {
repeated string message = 1;
}
message AddRequestProto {
required int32 param1 = 1;
required int32 param2 = 2;
}
message AddRequestProto2 {
repeated int32 params = 1;
}
message AddResponseProto {
required int32 result = 1;
}
message ExchangeRequestProto {
repeated int32 values = 1;
}
message ExchangeResponseProto {
repeated int32 values = 1;
}
message AuthMethodResponseProto {
required int32 code = 1;
required string mechanismName = 2;
}
message UserResponseProto {
required string user = 1;
}
message SleepRequestProto2 {
optional int64 sleep_time = 1;
}
message SleepResponseProto2 {
optional int64 receive_time = 1;
optional int64 response_time = 2;
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "TestRpcServiceProtosLegacy";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.common;
import "test_legacy.proto";
/**
* A protobuf service for use in tests
*/
service TestProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(EchoRequestProto) returns (EchoResponseProto);
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
rpc error2(EmptyRequestProto) returns (EmptyResponseProto);
rpc slowPing(SlowPingRequestProto) returns (EmptyResponseProto);
rpc echo2(EchoRequestProto2) returns (EchoResponseProto2);
rpc add(AddRequestProto) returns (AddResponseProto);
rpc add2(AddRequestProto2) returns (AddResponseProto);
rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto);
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
rpc lockAndSleep(SleepRequestProto) returns (EmptyResponseProto);
rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto);
rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto);
rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto);
}
service TestProtobufRpc2Proto {
rpc ping2(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo2(EchoRequestProto) returns (EchoResponseProto);
rpc sleep(SleepRequestProto) returns (SleepResponseProto);
}
service OldProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(EmptyRequestProto) returns (EmptyResponseProto);
}
service NewProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(OptRequestProto) returns (OptResponseProto);
}
service NewerProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(EmptyRequestProto) returns (EmptyResponseProto);
}
service CustomProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
}
service TestProtobufRpcHandoffProto {
rpc sleep(SleepRequestProto2) returns (SleepResponseProto2);
}