HADOOP-17046. Support downstreams' existing Hadoop-rpc implementations using non-shaded protobuf classes (#2026)

This commit is contained in:
Vinayakumar B 2020-06-12 23:16:33 +05:30 committed by GitHub
parent 7c4de59fc1
commit e154084770
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
81 changed files with 2233 additions and 169 deletions

View File

@ -283,6 +283,10 @@
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngineProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngine2Protos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>

View File

@ -395,7 +395,12 @@
<executions>
<execution>
<id>src-compile-protoc</id>
<configuration><skip>false</skip></configuration>
<configuration>
<skip>false</skip>
<excludes>
<exclude>ProtobufRpcEngine.proto</exclude>
</excludes>
</configuration>
</execution>
<execution>
<id>src-test-compile-protoc</id>
@ -411,6 +416,9 @@
<id>replace-generated-sources</id>
<configuration>
<skip>false</skip>
<excludes>
<exclude>**/ProtobufRpcEngineProtos.java</exclude>
</excludes>
</configuration>
</execution>
<execution>
@ -423,6 +431,14 @@
<id>replace-sources</id>
<configuration>
<skip>false</skip>
<!--These classes have direct Protobuf references for backward compatibility reasons-->
<excludes>
<exclude>**/ProtobufHelper.java</exclude>
<exclude>**/RpcWritable.java</exclude>
<exclude>**/ProtobufRpcEngineCallback.java</exclude>
<exclude>**/ProtobufRpcEngine.java</exclude>
<exclude>**/ProtobufRpcEngineProtos.java</exclude>
</excludes>
</configuration>
</execution>
<execution>
@ -1015,7 +1031,79 @@
</plugins>
</build>
</profile>
<!-- profile to use already generated protobuf code using 2.5.0 for aarch64-->
<profile>
<id>aarch64</id>
<activation>
<activeByDefault>false</activeByDefault>
<os>
<arch>aarch64</arch>
</os>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source-legacy-protobuf</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/src/main/arm-java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<!-- profile to generate protobuf code using 2.5.0-->
<profile>
<id>x86_64</id>
<activation>
<activeByDefault>false</activeByDefault>
<os>
<arch>!aarch64</arch>
</os>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<executions>
<execution>
<id>src-compile-protoc-legacy</id>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<skip>false</skip>
<!--Generating with old protobuf version for backward compatibility-->
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<includeDependenciesInDescriptorSet>false</includeDependenciesInDescriptorSet>
<protoSourceRoot>${basedir}/src/main/proto</protoSourceRoot>
<outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<includes>
<include>ProtobufRpcEngine.proto</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService;
import org.apache.hadoop.ha.protocolPB.ZKFCProtocolPB;
import org.apache.hadoop.ha.protocolPB.ZKFCProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.security.AccessControlException;
@ -51,7 +51,7 @@ public class ZKFCRpcServer implements ZKFCProtocol {
this.zkfc = zkfc;
RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
ZKFCProtocolServerSideTranslatorPB translator =
new ZKFCProtocolServerSideTranslatorPB(this);
BlockingService service = ZKFCProtocolService

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequ
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToObserverRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
@ -67,7 +67,7 @@ public class HAServiceProtocolClientSideTranslatorPB implements
public HAServiceProtocolClientSideTranslatorPB(InetSocketAddress addr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
RPC.getProtocolVersion(HAServiceProtocolPB.class), addr, conf);
}
@ -76,7 +76,7 @@ public class HAServiceProtocolClientSideTranslatorPB implements
InetSocketAddress addr, Configuration conf,
SocketFactory socketFactory, int timeout) throws IOException {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
RPC.getProtocolVersion(HAServiceProtocolPB.class), addr,
UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.AccessControlException;
@ -48,7 +48,7 @@ public class ZKFCProtocolClientSideTranslatorPB implements
InetSocketAddress addr, Configuration conf,
SocketFactory socketFactory, int timeout) throws IOException {
RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
rpcProxy = RPC.getProxy(ZKFCProtocolPB.class,
RPC.getProtocolVersion(ZKFCProtocolPB.class), addr,
UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);

View File

@ -53,6 +53,23 @@ public class ProtobufHelper {
return e instanceof IOException ? (IOException) e : new IOException(se);
}
/**
* Kept for backward compatible.
* Return the IOException thrown by the remote server wrapped in
* ServiceException as cause.
* @param se ServiceException that wraps IO exception thrown by the server
* @return Exception wrapped in ServiceException or
* a new IOException that wraps the unexpected ServiceException.
*/
@Deprecated
public static IOException getRemoteException(
com.google.protobuf.ServiceException se) {
Throwable e = se.getCause();
if (e == null) {
return new IOException(se);
}
return e instanceof IOException ? (IOException) e : new IOException(se);
}
/**
* Map used to cache fixed strings to ByteStrings. Since there is no

View File

@ -19,8 +19,11 @@
package org.apache.hadoop.ipc;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.*;
import org.apache.hadoop.thirdparty.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -29,6 +32,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@ -52,7 +56,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* RPC Engine for for protobuf based RPCs.
* This engine uses Protobuf 2.5.0. Recommended to upgrade to Protobuf 3.x
* from hadoop-thirdparty and use ProtobufRpcEngine2.
*/
@Deprecated
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
public static final Logger LOG =
@ -355,6 +362,7 @@ public class ProtobufRpcEngine implements RpcEngine {
new ThreadLocal<>();
static final ThreadLocal<CallInfo> currentCallInfo = new ThreadLocal<>();
private static final RpcInvoker RPC_INVOKER = new ProtoBufRpcInvoker();
static class CallInfo {
private final RPC.Server server;
@ -433,7 +441,15 @@ public class ProtobufRpcEngine implements RpcEngine {
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);
}
@Override
protected RpcInvoker getServerRpcInvoker(RpcKind rpcKind) {
if (rpcKind == RpcKind.RPC_PROTOCOL_BUFFER) {
return RPC_INVOKER;
}
return super.getServerRpcInvoker(rpcKind);
}
/**
* Protobuf invoker for {@link RpcInvoker}
*/

View File

@ -0,0 +1,598 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.*;
import org.apache.hadoop.thirdparty.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* RPC Engine for for protobuf based RPCs.
*/
@InterfaceStability.Evolving
public class ProtobufRpcEngine2 implements RpcEngine {
public static final Logger LOG =
LoggerFactory.getLogger(ProtobufRpcEngine2.class);
private static final ThreadLocal<AsyncGet<Message, Exception>>
ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for ProtobufRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
new Server.ProtoBufRpcInvoker());
}
private static final ClientCache CLIENTS = new ClientCache();
@Unstable
public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
return ASYNC_RETURN_MESSAGE.get();
}
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, null);
}
@Override
public <T> ProtocolProxy<T> getProxy(
Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy)
throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null, null);
}
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth,
alignmentContext);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[]{protocol}, new Invoker(protocol, connId, conf,
factory)), false);
}
private static final class Invoker implements RpcInvocationHandler {
private final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
private final Client.ConnectionId remoteId;
private final Client client;
private final long clientProtocolVersion;
private final String protocolName;
private AtomicBoolean fallbackToSimpleAuth;
private AlignmentContext alignmentContext;
private Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
}
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
private Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
}
private RequestHeaderProto constructRpcRequestHeader(Method method) {
RequestHeaderProto.Builder builder = RequestHeaderProto
.newBuilder();
builder.setMethodName(method.getName());
// For protobuf, {@code protocol} used when creating client side proxy is
// the interface extending BlockingInterface, which has the annotations
// such as ProtocolName etc.
//
// Using Method.getDeclaringClass(), as in WritableEngine to get at
// the protocol interface will return BlockingInterface, from where
// the annotation ProtocolName and Version cannot be
// obtained.
//
// Hence we simply use the protocol class used to create the proxy.
// For PB this may limit the use of mixins on client side.
builder.setDeclaringClassProtocolName(protocolName);
builder.setClientProtocolVersion(clientProtocolVersion);
return builder.build();
}
/**
* This is the client side invoker of RPC method. It only throws
* ServiceException, since the invocation proxy expects only
* ServiceException to be thrown by the method in case protobuf service.
*
* ServiceException has the following causes:
* <ol>
* <li>Exceptions encountered on the client side in this method are
* set as cause in ServiceException as is.</li>
* <li>Exceptions from the server are wrapped in RemoteException and are
* set as cause in ServiceException</li>
* </ol>
*
* Note that the client calling protobuf RPC methods, must handle
* ServiceException by getting the cause from the ServiceException. If the
* cause is RemoteException, then unwrap it to get the exception thrown by
* the server.
*/
@Override
public Message invoke(Object proxy, final Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
if (args.length != 2) { // RpcController + Message
throw new ServiceException(
"Too many or few parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length);
}
if (args[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
// if Tracing is on then start a new span for this rpc.
// guard it in the if statement to make sure there isn't
// any extra string manipulation.
Tracer tracer = Tracer.curThreadTracer();
TraceScope traceScope = null;
if (tracer != null) {
traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
}
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
remoteId + ": " + method.getName() +
" {" + TextFormat.shortDebugString((Message) args[1]) + "}");
}
final Message theRequest = (Message) args[1];
final RpcWritable.Buffer val;
try {
val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth, alignmentContext);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
remoteId + ": " + method.getName() +
" {" + e + "}");
}
if (traceScope != null) {
traceScope.addTimelineAnnotation("Call got exception: " +
e.toString());
}
throw new ServiceException(e);
} finally {
if (traceScope != null) {
traceScope.close();
}
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}
if (Client.isAsynchronousMode()) {
final AsyncGet<RpcWritable.Buffer, IOException> arr
= Client.getAsyncRpcResponse();
final AsyncGet<Message, Exception> asyncGet =
new AsyncGet<Message, Exception>() {
@Override
public Message get(long timeout, TimeUnit unit) throws Exception {
return getReturnMessage(method, arr.get(timeout, unit));
}
@Override
public boolean isDone() {
return arr.isDone();
}
};
ASYNC_RETURN_MESSAGE.set(asyncGet);
return null;
} else {
return getReturnMessage(method, val);
}
}
private Message getReturnMessage(final Method method,
final RpcWritable.Buffer buf) throws ServiceException {
Message prototype = null;
try {
prototype = getReturnProtoType(method);
} catch (Exception e) {
throw new ServiceException(e);
}
Message returnMessage;
try {
returnMessage = buf.getValue(prototype.getDefaultInstanceForType());
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Response <- " +
remoteId + ": " + method.getName() +
" {" + TextFormat.shortDebugString(returnMessage) + "}");
}
} catch (Throwable e) {
throw new ServiceException(e);
}
return returnMessage;
}
@Override
public void close() throws IOException {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
private Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
}
Class<?> returnType = method.getReturnType();
Method newInstMethod = returnType.getMethod("getDefaultInstance");
newInstMethod.setAccessible(true);
Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null);
returnTypes.put(method.getName(), prototype);
return prototype;
}
@Override //RpcInvocationHandler
public ConnectionId getConnectionId() {
return remoteId;
}
}
@VisibleForTesting
@InterfaceAudience.Private
@InterfaceStability.Unstable
static Client getClient(Configuration conf) {
return CLIENTS.getClient(conf, SocketFactory.getDefault(),
RpcWritable.Buffer.class);
}
@Override
public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
return new Server(protocol, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig, alignmentContext);
}
public static class Server extends RPC.Server {
static final ThreadLocal<ProtobufRpcEngineCallback2> CURRENT_CALLBACK =
new ThreadLocal<>();
static final ThreadLocal<CallInfo> CURRENT_CALL_INFO = new ThreadLocal<>();
static class CallInfo {
private final RPC.Server server;
private final String methodName;
CallInfo(RPC.Server server, String methodName) {
this.server = server;
this.methodName = methodName;
}
}
static class ProtobufRpcEngineCallbackImpl
implements ProtobufRpcEngineCallback2 {
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;
ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().server;
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().methodName;
this.setupTime = Time.now();
}
@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
}
@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
call.setDeferredError(t);
}
}
@InterfaceStability.Unstable
public static ProtobufRpcEngineCallback2 registerForDeferredResponse() {
ProtobufRpcEngineCallback2 callback = new ProtobufRpcEngineCallbackImpl();
CURRENT_CALLBACK.set(callback);
return callback;
}
/**
* Construct an RPC server.
*
* @param protocolClass the class of protocol
* @param protocolImpl the protocolImpl whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
* @param portRangeConfig A config parameter that can be used to restrict
* the range of ports used when port is 0 (an ephemeral port)
* @param alignmentContext provides server state info on client responses
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port, int numHandlers,
int numReaders, int queueSizePerHandler, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
super(bindAddress, port, null, numHandlers,
numReaders, queueSizePerHandler, conf,
serverNameFromClass(protocolImpl.getClass()), secretManager,
portRangeConfig);
setAlignmentContext(alignmentContext);
this.verbose = verbose;
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);
}
/**
* Protobuf invoker for {@link RpcInvoker}.
*/
static class ProtoBufRpcInvoker implements RpcInvoker {
private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
String protoName, long clientVersion) throws RpcServerException {
ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
ProtoClassProtoImpl impl =
server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (impl == null) { // no match for Protocol AND Version
VerProtocolImpl highest = server.getHighestSupportedProtocol(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, protoName);
if (highest == null) {
throw new RpcNoSuchProtocolException(
"Unknown protocol: " + protoName);
}
// protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
return impl;
}
@Override
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the response.
* See {@link HadoopRpcResponseProto}
*
* In this method there three types of exceptions possible and they are
* returned in response as follows.
* <ol>
* <li> Exceptions encountered in this method that are returned
* as {@link RpcServerException} </li>
* <li> Exceptions thrown by the service is wrapped in ServiceException.
* In that this method returns in response the exception thrown by the
* service.</li>
* <li> Other exceptions thrown by the service. They are returned as
* it is.</li>
* </ol>
*/
public Writable call(RPC.Server server, String connectionProtocolName,
Writable writableRequest, long receiveTime) throws Exception {
RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
RequestHeaderProto rpcRequest = request.getRequestHeader();
String methodName = rpcRequest.getMethodName();
/**
* RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy.
* The rpcProxy's has a declared protocol name that is
* sent form client to server at connection time.
*
* Each Rpc call also sends a protocol name
* (called declaringClassprotocolName). This name is usually the same
* as the connection protocol name except in some cases.
* For example metaProtocols such ProtocolInfoProto which get info
* about the protocol reuse the connection but need to indicate that
* the actual protocol is different (i.e. the protocol is
* ProtocolInfoProto) since they reuse the connection; in this case
* the declaringClassProtocolName field is set to the ProtocolInfoProto.
*/
String declaringClassProtoName =
rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
if (server.verbose) {
LOG.info("Call: connectionProtocolName=" + connectionProtocolName +
", method=" + methodName);
}
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
declaringClassProtoName, clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on "
+ connectionProtocolName + " protocol.";
LOG.warn(msg);
throw new RpcNoSuchMethodException(msg);
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = request.getValue(prototype);
Message result;
Call currentCall = Server.getCurCall().get();
try {
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
CURRENT_CALL_INFO.set(new CallInfo(server, methodName));
currentCall.setDetailedMetricsName(methodName);
result = service.callBlockingMethod(methodDescriptor, null, param);
// Check if this needs to be a deferred response,
// by checking the ThreadLocal callback being set
if (CURRENT_CALLBACK.get() != null) {
currentCall.deferResponse();
CURRENT_CALLBACK.set(null);
return null;
}
} catch (ServiceException e) {
Exception exception = (Exception) e.getCause();
currentCall.setDetailedMetricsName(
exception.getClass().getSimpleName());
throw (Exception) e.getCause();
} catch (Exception e) {
currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
throw e;
} finally {
CURRENT_CALL_INFO.set(null);
}
return RpcWritable.wrap(result);
}
}
}
// htrace in the ipc layer creates the span name based on toString()
// which uses the rpc header. in the normal case we want to defer decoding
// the rpc header until needed by the rpc engine.
static class RpcProtobufRequest extends RpcWritable.Buffer {
private volatile RequestHeaderProto requestHeader;
private Message payload;
RpcProtobufRequest() {
}
RpcProtobufRequest(RequestHeaderProto header, Message payload) {
this.requestHeader = header;
this.payload = payload;
}
RequestHeaderProto getRequestHeader() throws IOException {
if (getByteBuffer() != null && requestHeader == null) {
requestHeader = getValue(RequestHeaderProto.getDefaultInstance());
}
return requestHeader;
}
@Override
public void writeTo(ResponseBuffer out) throws IOException {
requestHeader.writeDelimitedTo(out);
if (payload != null) {
payload.writeDelimitedTo(out);
}
}
// this is used by htrace to name the span.
@Override
public String toString() {
try {
RequestHeaderProto header = getRequestHeader();
return header.getDeclaringClassProtocolName() + "." +
header.getMethodName();
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
}

View File

@ -18,12 +18,17 @@
package org.apache.hadoop.ipc;
import org.apache.hadoop.thirdparty.protobuf.Message;
import com.google.protobuf.Message;
/**
* This engine uses Protobuf 2.5.0. Recommended to upgrade to Protobuf 3.x
* from hadoop-thirdparty and use ProtobufRpcEngineCallback2.
*/
@Deprecated
public interface ProtobufRpcEngineCallback {
public void setResponse(Message message);
void setResponse(Message message);
public void error(Throwable t);
void error(Throwable t);
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.thirdparty.protobuf.Message;
public interface ProtobufRpcEngineCallback2 {
public void setResponse(Message message);
public void error(Throwable t);
}

View File

@ -1043,7 +1043,7 @@ public class RPC {
private void initProtocolMetaInfo(Configuration conf) {
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(this);
BlockingService protocolInfoBlockingService = ProtocolInfoService
@ -1067,7 +1067,7 @@ public class RPC {
@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
return getServerRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}
}

View File

@ -114,7 +114,7 @@ public class RpcClientUtil {
if (versionMap == null) {
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
ProtocolMetaInfoPB protocolInfoProxy = getProtocolMetaInfoProxy(rpcProxy,
conf);
GetProtocolSignatureRequestProto.Builder builder =

View File

@ -42,6 +42,8 @@ public abstract class RpcWritable implements Writable {
return (RpcWritable)o;
} else if (o instanceof Message) {
return new ProtobufWrapper((Message)o);
} else if (o instanceof com.google.protobuf.Message) {
return new ProtobufWrapperLegacy((com.google.protobuf.Message) o);
} else if (o instanceof Writable) {
return new WritableWrapper((Writable)o);
}
@ -132,6 +134,49 @@ public abstract class RpcWritable implements Writable {
}
}
// adapter for Protobufs.
static class ProtobufWrapperLegacy extends RpcWritable {
private com.google.protobuf.Message message;
ProtobufWrapperLegacy(com.google.protobuf.Message message) {
this.message = message;
}
com.google.protobuf.Message getMessage() {
return message;
}
@Override
void writeTo(ResponseBuffer out) throws IOException {
int length = message.getSerializedSize();
length += com.google.protobuf.CodedOutputStream.
computeUInt32SizeNoTag(length);
out.ensureCapacity(length);
message.writeDelimitedTo(out);
}
@SuppressWarnings("unchecked")
@Override
<T> T readFrom(ByteBuffer bb) throws IOException {
// using the parser with a byte[]-backed coded input stream is the
// most efficient way to deserialize a protobuf. it has a direct
// path to the PB ctor that doesn't create multi-layered streams
// that internally buffer.
com.google.protobuf.CodedInputStream cis =
com.google.protobuf.CodedInputStream.newInstance(
bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
try {
cis.pushLimit(cis.readRawVarint32());
message = message.getParserForType().parseFrom(cis);
cis.checkLastTagWas(0);
} finally {
// advance over the bytes read.
bb.position(bb.position() + cis.getTotalBytesRead());
}
return (T)message;
}
}
/**
* adapter to allow decoding of writables and protobufs from a byte buffer.
*/

View File

@ -304,7 +304,11 @@ public abstract class Server {
RpcKindMapValue val = rpcKindMap.get(ProtoUtil.convert(rpcKind));
return (val == null) ? null : val.rpcRequestWrapperClass;
}
protected RpcInvoker getServerRpcInvoker(RPC.RpcKind rpcKind) {
return getRpcInvoker(rpcKind);
}
public static RpcInvoker getRpcInvoker(RPC.RpcKind rpcKind) {
RpcKindMapValue val = rpcKindMap.get(rpcKind);
return (val == null) ? null : val.rpcInvoker;
@ -2688,15 +2692,15 @@ public abstract class Server {
call.setPriorityLevel(callQueue.getPriorityLevel(call));
call.markCallCoordinated(false);
if(alignmentContext != null && call.rpcRequest != null &&
(call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
(call.rpcRequest instanceof ProtobufRpcEngine2.RpcProtobufRequest)) {
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
// step and treat the call as uncoordinated. As currently only certain
// ClientProtocol methods request made through RPC protobuf needs to be
// coordinated.
String methodName;
String protoName;
ProtobufRpcEngine.RpcProtobufRequest req =
(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
ProtobufRpcEngine2.RpcProtobufRequest req =
(ProtobufRpcEngine2.RpcProtobufRequest) call.rpcRequest;
try {
methodName = req.getRequestHeader().getMethodName();
protoName = req.getRequestHeader().getDeclaringClassProtocolName();

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -179,7 +179,7 @@ public class TraceAdmin extends Configured implements Tool {
servicePrincipal);
}
RPC.setProtocolEngine(getConf(), TraceAdminProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
InetSocketAddress address = NetUtils.createSocketAddr(hostPort);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Class<?> xface = TraceAdminProtocolPB.class;

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *stable* .proto interface.
*/
syntax = "proto2";
/**
* These are the messages used by Hadoop RPC for the Rpc Engine Protocol Buffer
* to marshal the request and response in the RPC layer.
* The messages are sent in addition to the normal RPC header as
* defined in RpcHeader.proto
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "ProtobufRpcEngine2Protos";
option java_generate_equals_and_hash = true;
package hadoop.common;
/**
* This message is the header for the Protobuf Rpc Engine
* when sending a RPC request from RPC client to the RPC server.
* The actual request (serialized as protobuf) follows this request.
*
* No special header is needed for the Rpc Response for Protobuf Rpc Engine.
* The normal RPC response header (see RpcHeader.proto) are sufficient.
*/
message RequestHeaderProto {
/** Name of the RPC method */
required string methodName = 1;
/**
* RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy.
* The rpcProxy's has a declared protocol name that is
* sent form client to server at connection time.
*
* Each Rpc call also sends a protocol name
* (called declaringClassprotocolName). This name is usually the same
* as the connection protocol name except in some cases.
* For example metaProtocols such ProtocolInfoProto which get metainfo
* about the protocol reuse the connection but need to indicate that
* the actual protocol is different (i.e. the protocol is
* ProtocolInfoProto) since they reuse the connection; in this case
* the declaringClassProtocolName field is set to the ProtocolInfoProto
*/
required string declaringClassProtocolName = 2;
/** protocol version of class declaring the called method */
required uint64 clientProtocolVersion = 3;
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
@ -119,7 +119,7 @@ class DummyHAService extends HAServiceTarget {
try {
RPC.setProtocolEngine(conf,
HAServiceProtocolPB.class, ProtobufRpcEngine.class);
HAServiceProtocolPB.class, ProtobufRpcEngine2.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(new MockHAProtocolImpl());
BlockingService haPbService = HAServiceProtocolService

View File

@ -66,7 +66,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
public int secondsToRun = 15;
private int msgSize = 1024;
public Class<? extends RpcEngine> rpcEngine =
ProtobufRpcEngine.class;
ProtobufRpcEngine2.class;
private MyOptions(String args[]) {
try {
@ -181,7 +181,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
if (line.hasOption('e')) {
String eng = line.getOptionValue('e');
if ("protobuf".equals(eng)) {
rpcEngine = ProtobufRpcEngine.class;
rpcEngine = ProtobufRpcEngine2.class;
} else {
throw new ParseException("invalid engine: " + eng);
}
@ -224,7 +224,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
RPC.Server server;
// Get RPC server for server side implementation
if (opts.rpcEngine == ProtobufRpcEngine.class) {
if (opts.rpcEngine == ProtobufRpcEngine2.class) {
// Create server side implementation
PBServerImpl serverImpl = new PBServerImpl();
BlockingService service = TestProtobufRpcProto
@ -378,7 +378,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(opts.host, opts.getPort());
if (opts.rpcEngine == ProtobufRpcEngine.class) {
if (opts.rpcEngine == ProtobufRpcEngine2.class) {
final TestRpcService proxy = RPC.getProxy(TestRpcService.class, 0, addr, conf);
return new RpcServiceWrapper() {
@Override

View File

@ -45,7 +45,7 @@ public class TestMultipleProtocolServer extends TestRpcBase {
// Set RPC engine to protobuf RPC engine
Configuration conf2 = new Configuration();
RPC.setProtocolEngine(conf2, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
TestRpcService client = RPC.getProxy(TestRpcService.class, 0, addr, conf2);
TestProtoBufRpc.testProtoBufRpc(client);
}

View File

@ -25,8 +25,6 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.OptRequestProto;
@ -138,7 +136,7 @@ public class TestProtoBufRPCCompatibility {
conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, NewRpcService.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, NewRpcService.class, ProtobufRpcEngine2.class);
// Create server side implementation
NewServerImpl serverImpl = new NewServerImpl();
@ -151,7 +149,7 @@ public class TestProtoBufRPCCompatibility {
server.start();
RPC.setProtocolEngine(conf, OldRpcService.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, OldRpcService.class, ProtobufRpcEngine2.class);
OldRpcService proxy = RPC.getProxy(OldRpcService.class, 0, addr, conf);
// Verify that exception is thrown if protocolVersion is mismatch between
@ -168,7 +166,8 @@ public class TestProtoBufRPCCompatibility {
}
// Verify that missing of optional field is still compatible in RPC call.
RPC.setProtocolEngine(conf, NewerRpcService.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, NewerRpcService.class,
ProtobufRpcEngine2.class);
NewerRpcService newProxy = RPC.getProxy(NewerRpcService.class, 0, addr,
conf);
newProxy.echo(null, emptyRequest);

View File

@ -94,8 +94,9 @@ public class TestProtoBufRpc extends TestRpcBase {
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, TestRpcService2.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine2.class);
RPC.setProtocolEngine(conf, TestRpcService2.class,
ProtobufRpcEngine2.class);
// Create server side implementation
PBServerImpl serverImpl = new PBServerImpl();

View File

@ -52,7 +52,7 @@ public class TestProtoBufRpcServerHandoff {
TestProtobufRpcHandoffProto.newReflectiveBlockingService(serverImpl);
RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
RPC.Server server = new RPC.Builder(conf)
.setProtocol(TestProtoBufRpcServerHandoffProtocol.class)
.setInstance(blockingService)
@ -144,8 +144,8 @@ public class TestProtoBufRpcServerHandoff {
TestProtos.SleepRequestProto2 request) throws
ServiceException {
final long startTime = System.currentTimeMillis();
final ProtobufRpcEngineCallback callback =
ProtobufRpcEngine.Server.registerForDeferredResponse();
final ProtobufRpcEngineCallback2 callback =
ProtobufRpcEngine2.Server.registerForDeferredResponse();
final long sleepTime = request.getSleepTime();
new Thread() {
@Override

View File

@ -114,19 +114,19 @@ public class TestRPCCompatibility {
ProtocolSignature.resetCache();
RPC.setProtocolEngine(conf,
TestProtocol0.class, ProtobufRpcEngine.class);
TestProtocol0.class, ProtobufRpcEngine2.class);
RPC.setProtocolEngine(conf,
TestProtocol1.class, ProtobufRpcEngine.class);
TestProtocol1.class, ProtobufRpcEngine2.class);
RPC.setProtocolEngine(conf,
TestProtocol2.class, ProtobufRpcEngine.class);
TestProtocol2.class, ProtobufRpcEngine2.class);
RPC.setProtocolEngine(conf,
TestProtocol3.class, ProtobufRpcEngine.class);
TestProtocol3.class, ProtobufRpcEngine2.class);
RPC.setProtocolEngine(conf,
TestProtocol4.class, ProtobufRpcEngine.class);
TestProtocol4.class, ProtobufRpcEngine2.class);
}
@After

View File

@ -44,7 +44,7 @@ public class TestRPCWaitForProxy extends TestRpcBase {
@Before
public void setupProtocolEngine() {
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
}
/**

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.TestConnectionRetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.TestRpcBase.TestRpcService;
import org.junit.Before;
import org.junit.Test;
@ -129,7 +128,7 @@ public class TestReuseRpcConnections extends TestRpcBase {
try {
proxy1 = getClient(addr, newConf, retryPolicy1);
proxy1.ping(null, newEmptyRequest());
client = ProtobufRpcEngine.getClient(newConf);
client = ProtobufRpcEngine2.getClient(newConf);
final Set<ConnectionId> conns = client.getConnectionIds();
assertEquals("number of connections in cache is wrong", 1, conns.size());

View File

@ -70,7 +70,7 @@ public class TestRpcBase {
protected void setupConf() {
conf = new Configuration();
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
}

View File

@ -169,7 +169,7 @@ public class TestSaslRPC extends TestRpcBase {
clientFallBackToSimpleAllowed = true;
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine2.class);
}
static String getQOPNames (QualityOfProtection[] qops){
@ -356,7 +356,7 @@ public class TestSaslRPC extends TestRpcBase {
newConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, timeouts[0]);
proxy1 = getClient(addr, newConf);
proxy1.getAuthMethod(null, newEmptyRequest());
client = ProtobufRpcEngine.getClient(newConf);
client = ProtobufRpcEngine2.getClient(newConf);
Set<ConnectionId> conns = client.getConnectionIds();
assertEquals("number of connections in cache is wrong", 1, conns.size());
// same conf, connection should be re-used

View File

@ -21,7 +21,7 @@ import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRpcBase;
@ -151,7 +151,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5);
@ -181,7 +181,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1");
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5);
@ -215,7 +215,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1");
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5);
@ -251,7 +251,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 2);
@ -286,7 +286,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
final Configuration conf = new Configuration();
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 2);
@ -322,7 +322,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group3");
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 2);
@ -363,7 +363,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
TestTokenSecretManager sm = new TestTokenSecretManager();
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5, sm);
@ -411,7 +411,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf);
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(newConf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(newConf);
final Server server = setupTestServer(newConf, 5, sm);

View File

@ -56,7 +56,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@ -355,7 +355,7 @@ public class NameNodeProxiesClient {
AlignmentContext alignmentContext)
throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
final RetryPolicy defaultPolicy =
RetryUtils.getDefaultRetryPolicy(

View File

@ -68,7 +68,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@ -181,7 +181,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int socketTimeout) throws IOException {
RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
return RPC.getProxy(ClientDatanodeProtocolPB.class,
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), addr, ticket,
conf, factory, socketTimeout);

View File

@ -237,7 +237,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@ -456,7 +456,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
private void setAsyncReturnValue() {
final AsyncGet<Message, Exception> asyncReturnMessage
= ProtobufRpcEngine.getAsyncReturnMessage();
= ProtobufRpcEngine2.getAsyncReturnMessage();
final AsyncGet<Void, Exception> asyncGet
= new AsyncGet<Void, Exception>() {
@Override
@ -1570,7 +1570,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
if (Client.isAsynchronousMode()) {
rpcProxy.getAclStatus(null, req);
final AsyncGet<Message, Exception> asyncReturnMessage
= ProtobufRpcEngine.getAsyncReturnMessage();
= ProtobufRpcEngine2.getAsyncReturnMessage();
final AsyncGet<AclStatus, Exception> asyncGet
= new AsyncGet<AclStatus, Exception>() {
@Override

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListR
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@ -84,7 +84,7 @@ public class ReconfigurationProtocolTranslatorPB implements
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int socketTimeout) throws IOException {
RPC.setProtocolEngine(conf, ReconfigurationProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
return RPC.getProxy(ReconfigurationProtocolPB.class,
RPC.getProtocolVersion(ReconfigurationProtocolPB.class),
addr, ticket, conf, factory, socketTimeout);

View File

@ -47,7 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
@ -379,7 +379,7 @@ public class ConnectionPool {
throw new IllegalStateException(msg);
}
ProtoImpl classes = PROTO_MAP.get(proto);
RPC.setProtocolEngine(conf, classes.protoPb, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, classes.protoPb, ProtobufRpcEngine2.class);
final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,

View File

@ -77,7 +77,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableE
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RefreshRegistry;
@ -139,7 +139,7 @@ public class RouterAdminServer extends AbstractService
RBFConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT);
RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator =
new RouterAdminProtocolServerSideTranslatorPB(this);

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RouterGenericManager;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -47,7 +47,7 @@ public class RouterClient implements Closeable {
throws IOException {
RPC.setProtocolEngine(
conf, RouterAdminProtocolPB.class, ProtobufRpcEngine.class);
conf, RouterAdminProtocolPB.class, ProtobufRpcEngine2.class);
AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
final long version = RPC.getProtocolVersion(RouterAdminProtocolPB.class);

View File

@ -133,7 +133,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RemoteException;
@ -256,7 +256,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
readerQueueSize);
RPC.setProtocolEngine(this.conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =

View File

@ -71,7 +71,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableE
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.RemoteException;
@ -1222,7 +1222,7 @@ public class RouterAdmin extends Configured implements Tool {
InetSocketAddress address = NetUtils.createSocketAddr(hostport);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine2.class);
GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB)RPC.getProxy(
xface, RPC.getProtocolVersion(xface), address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), 0);

View File

@ -90,7 +90,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RemoteException;
@ -174,7 +174,7 @@ public class MockNamenode {
*/
private void setupRPCServer(final Configuration conf) throws IOException {
RPC.setProtocolEngine(
conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class);
ClientNamenodeProtocolServerSideTranslatorPB
clientNNProtoXlator =
new ClientNamenodeProtocolServerSideTranslatorPB(mockNn);

View File

@ -349,6 +349,9 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<id>replace-sources</id>
<configuration>
<skip>false</skip>
<excludes>
<exclude>**/DFSUtil.java</exclude>
</excludes>
</configuration>
</execution>
<execution>

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.security.AccessControlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -92,7 +93,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.AuthFilterInitializer;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
@ -1295,6 +1296,27 @@ public class DFSUtil {
*/
public static void addPBProtocol(Configuration conf, Class<?> protocol,
BlockingService service, RPC.Server server) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine2.class);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}
/**
* Add protobuf based protocol to the {@link RPC.Server}.
* This engine uses Protobuf 2.5.0. Recommended to upgrade to
* Protobuf 3.x from hadoop-thirdparty and use
* {@link DFSUtil#addPBProtocol(Configuration, Class, BlockingService,
* RPC.Server)}.
* @param conf configuration
* @param protocol Protocol interface
* @param service service that implements the protocol
* @param server RPC server to which the protocol &amp; implementation is
* added to
* @throws IOException
*/
@Deprecated
public static void addPBProtocol(Configuration conf, Class<?> protocol,
com.google.protobuf.BlockingService service, RPC.Server server)
throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProxyCombiner;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
@ -305,7 +305,7 @@ public class NameNodeProxies {
private static <T> T createNameNodeProxy(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi, Class<T> xface,
int rpcTimeout, AlignmentContext alignmentContext) throws IOException {
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine2.class);
return RPC.getProtocolProxy(xface,
RPC.getProtocolVersion(xface), address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout, null, null,

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
@ -57,7 +57,7 @@ public class DatanodeLifelineProtocolClientSideTranslatorPB implements
public DatanodeLifelineProtocolClientSideTranslatorPB(
InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DatanodeLifelineProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}

View File

@ -62,7 +62,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
@ -99,7 +99,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
@ -62,7 +62,7 @@ public class InterDatanodeProtocolTranslatorPB implements
int socketTimeout)
throws IOException {
RPC.setProtocolEngine(conf, InterDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
rpcProxy = RPC.getProxy(InterDatanodeProtocolPB.class,
RPC.getProtocolVersion(InterDatanodeProtocolPB.class), addr, ugi, conf,
factory, socketTimeout);

View File

@ -52,7 +52,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StopWatch;
@ -235,13 +235,13 @@ public class IPCLoggerChannel implements AsyncLogger {
true);
RPC.setProtocolEngine(confCopy,
QJournalProtocolPB.class, ProtobufRpcEngine.class);
QJournalProtocolPB.class, ProtobufRpcEngine2.class);
return SecurityUtil.doAsLoginUser(
new PrivilegedExceptionAction<QJournalProtocol>() {
@Override
public QJournalProtocol run() throws IOException {
RPC.setProtocolEngine(confCopy,
QJournalProtocolPB.class, ProtobufRpcEngine.class);
QJournalProtocolPB.class, ProtobufRpcEngine2.class);
QJournalProtocolPB pbproxy = RPC.getProxy(
QJournalProtocolPB.class,
RPC.getProtocolVersion(QJournalProtocolPB.class),

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTran
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.net.NetUtils;
@ -85,7 +85,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
LOG.info("RPC server is binding to " + bindHost + ":" + addr.getPort());
RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
QJournalProtocolServerSideTranslatorPB translator =
new QJournalProtocolServerSideTranslatorPB(this);
BlockingService service = QJournalProtocolService

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -505,7 +505,7 @@ public class JournalNodeSyncer {
@Override
public InterQJournalProtocol run() throws IOException {
RPC.setProtocolEngine(confCopy, InterQJournalProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
InterQJournalProtocolPB interQJournalProtocolPB = RPC.getProxy(
InterQJournalProtocolPB.class,
RPC.getProtocolVersion(InterQJournalProtocolPB.class),

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.aliasmap;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
@ -71,7 +71,7 @@ public class InMemoryLevelDBAliasMapServer implements InMemoryAliasMapProtocol,
public void start() throws IOException {
RPC.setProtocolEngine(getConf(), AliasMapProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
AliasMapProtocolServerSideTranslatorPB aliasMapProtocolXlator =
new AliasMapProtocolServerSideTranslatorPB(this);

View File

@ -188,7 +188,7 @@ import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
@ -1015,7 +1015,7 @@ public class DataNode extends ReconfigurableBase
// Add all the RPC protocols that the Datanode implements
RPC.setProtocolEngine(getConf(), ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator =
new ClientDatanodeProtocolServerSideTranslatorPB(this);
BlockingService service = ClientDatanodeProtocolService

View File

@ -184,7 +184,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RetryCache;
@ -281,7 +281,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
@ -405,7 +405,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
if (lifelineRpcAddr != null) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
String bindHost = nn.getLifelineRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = lifelineRpcAddr.getHostName();

View File

@ -87,7 +87,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RefreshResponse;
@ -2029,7 +2029,7 @@ public class DFSAdmin extends FsShell {
InetSocketAddress address = NetUtils.createSocketAddr(hostport);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine2.class);
GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB)
RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
ugi, conf, NetUtils.getDefaultSocketFactory(conf), 0);

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Rule;
@ -225,7 +225,7 @@ public class TestQJMWithFaults {
// If the user specifies a seed, then we should gather all the
// IPC trace information so that debugging is easier. This makes
// the test run about 25% slower otherwise.
GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL);
GenericTestUtils.setLogLevel(ProtobufRpcEngine2.LOG, Level.ALL);
} else {
seed = new Random().nextLong();
}

View File

@ -58,7 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
@ -87,7 +87,7 @@ public class TestQuorumJournalManager {
private final List<QuorumJournalManager> toClose = Lists.newLinkedList();
static {
GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL);
GenericTestUtils.setLogLevel(ProtobufRpcEngine2.LOG, Level.ALL);
}
@Rule

View File

@ -74,7 +74,7 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.TestWritable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
@ -314,7 +314,7 @@ public class TestBlockToken {
.getReplicaVisibleLength(any(), any());
RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(mockDN);
return new RPC.Builder(conf).setProtocol(ClientDatanodeProtocolPB.class)

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.*;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.http.HttpRequestLog;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
import org.apache.hadoop.ipc.ProtobufRpcEngine2.Server;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB;
@ -34,7 +34,7 @@ public class HSClientProtocolPBClientImpl extends MRClientProtocolPBClientImpl
InetSocketAddress addr, Configuration conf) throws IOException {
super();
RPC.setProtocolEngine(conf, HSClientProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy = (HSClientProtocolPB)RPC.getProxy(
HSClientProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -24,7 +24,7 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
@ -108,8 +108,10 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol,
public MRClientProtocolPBClientImpl() {};
public MRClientProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, MRClientProtocolPB.class, ProtobufRpcEngine.class);
public MRClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, MRClientProtocolPB.class,
ProtobufRpcEngine2.class);
proxy = RPC.getProxy(MRClientProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.v2.api.HSAdminRefreshProtocol;
import org.apache.hadoop.mapreduce.v2.api.HSAdminRefreshProtocolPB;
@ -93,7 +93,7 @@ public class HSProxies {
private static Object createHSProxy(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi, Class<?> xface,
int rpcTimeout) throws IOException {
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine2.class);
Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
ugi, conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
return proxy;

View File

@ -25,7 +25,7 @@ import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.AccessControlException;
@ -81,7 +81,7 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
@Override
public void serviceInit(Configuration conf) throws Exception {
RPC.setProtocolEngine(conf, RefreshUserMappingsProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = new RefreshUserMappingsProtocolServerSideTranslatorPB(
this);
@ -154,7 +154,7 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
private void addProtocol(Configuration conf, Class<?> protocol,
BlockingService blockingService) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine2.class);
clientRpcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol,
blockingService);
}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.service.impl.pb.client;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -58,7 +58,7 @@ public class ClientAMProtocolPBClientImpl
public ClientAMProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ClientAMProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy = RPC.getProxy(ClientAMProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -24,7 +24,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
@ -207,7 +207,7 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
public ApplicationClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -23,12 +23,11 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
@ -86,7 +85,7 @@ public class ApplicationHistoryProtocolPBClientImpl implements
public ApplicationHistoryProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ApplicationHistoryProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy =
RPC.getProxy(ApplicationHistoryProtocolPB.class, clientVersion, addr,
conf);

View File

@ -24,7 +24,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
@ -55,7 +55,8 @@ public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterP
public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
ProtobufRpcEngine2.class);
proxy =
(ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion,
addr, conf);

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientSCMProtocol;
import org.apache.hadoop.yarn.api.ClientSCMProtocolPB;
@ -50,7 +50,7 @@ public class ClientSCMProtocolPBClientImpl implements ClientSCMProtocol,
public ClientSCMProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ClientSCMProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy = RPC.getProxy(ClientSCMProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.api.impl.pb.client;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -106,7 +106,7 @@ public class ContainerManagementProtocolPBClientImpl implements ContainerManagem
public ContainerManagementProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.api.impl.pb.client;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.CsiAdaptorPB;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
@ -57,7 +57,7 @@ public class CsiAdaptorProtocolPBClientImpl
public CsiAdaptorProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, CsiAdaptorPB.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, CsiAdaptorPB.class, ProtobufRpcEngine2.class);
this.proxy = RPC.getProxy(CsiAdaptorPB.class, clientVersion, addr, conf);
}

View File

@ -30,7 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.SecretManager;
@ -165,7 +165,7 @@ public class RpcServerFactoryPBImpl implements RpcServerFactory {
private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
BlockingService blockingService, String portRangeConfig) throws IOException {
RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine2.class);
RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)
.setInstance(blockingService).setBindAddress(addr.getHostName())
.setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)

View File

@ -25,7 +25,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -114,7 +114,7 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
public ResourceManagerAdministrationProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ResourceManagerAdministrationProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy = (ResourceManagerAdministrationProtocolPB)RPC.getProxy(
ResourceManagerAdministrationProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB;
@ -45,7 +45,7 @@ public class SCMAdminProtocolPBClientImpl implements SCMAdminProtocol,
public SCMAdminProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, SCMAdminProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy = RPC.getProxy(SCMAdminProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -63,7 +63,7 @@ public class CollectorNodemanagerProtocolPBClientImpl implements
public CollectorNodemanagerProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, CollectorNodemanagerProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.api.impl.pb.client;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
@ -63,7 +63,7 @@ public class DistributedSchedulingAMProtocolPBClientImpl implements
public DistributedSchedulingAMProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy = RPC.getProxy(DistributedSchedulingAMProtocolPB.class, clientVersion,
addr, conf);
}

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -52,7 +52,8 @@ public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
private ResourceTrackerPB proxy;
public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, ResourceTrackerPB.class,
ProtobufRpcEngine2.class);
proxy = (ResourceTrackerPB)RPC.getProxy(
ResourceTrackerPB.class, clientVersion, addr, conf);
}

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -50,7 +50,7 @@ public class SCMUploaderProtocolPBClientImpl implements
public SCMUploaderProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, SCMUploaderProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
proxy =
RPC.getProxy(SCMUploaderProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -25,7 +25,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
@ -262,7 +262,7 @@ public class TestRPC {
new DummyContainerManager(), addr, conf, null, 1);
server.start();
RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
ContainerManagementProtocol proxy = (ContainerManagementProtocol)
rpc.getProxy(ContainerManagementProtocol.class,
NetUtils.getConnectAddress(server), conf);

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -42,7 +42,8 @@ public class LocalizationProtocolPBClientImpl implements LocalizationProtocol,
private LocalizationProtocolPB proxy;
public LocalizationProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, LocalizationProtocolPB.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, LocalizationProtocolPB.class,
ProtobufRpcEngine2.class);
proxy = (LocalizationProtocolPB)RPC.getProxy(
LocalizationProtocolPB.class, clientVersion, addr, conf);
}

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRPC.TestImpl;
@ -220,7 +220,7 @@ public class TestNMAuditLogger {
@Test
public void testNMAuditLoggerWithIP() throws Exception {
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine2.class);
// Create server side implementation
MyTestRPCServer serverImpl = new MyTestRPCServer();

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.StandbyException;
@ -201,7 +201,7 @@ public class AdminService extends CompositeService implements
if (rm.getRMContext().isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
@ -1045,7 +1045,7 @@ public class TestOpportunisticContainerAllocatorAMService {
// Verify that the OpportunisticContainerAllocatorAMSercvice can handle
// vanilla ApplicationMasterProtocol clients
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
ApplicationMasterProtocolPB ampProxy =
RPC.getProxy(ApplicationMasterProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
@ -1080,7 +1080,7 @@ public class TestOpportunisticContainerAllocatorAMService {
// Verify that the DistrubutedSchedulingService can handle the
// DistributedSchedulingAMProtocol clients as well
RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
DistributedSchedulingAMProtocolPB dsProxy =
RPC.getProxy(DistributedSchedulingAMProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRPC.TestImpl;
@ -420,7 +420,7 @@ public class TestRMAuditLogger {
public void testRMAuditLoggerWithIP() throws Exception {
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
// Create server side implementation
MyTestRPCServer serverImpl = new MyTestRPCServer();

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@ -160,7 +160,7 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
Configuration conf = getConfig();
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, CustomProtocol.class,
ProtobufRpcEngine.class);
ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
BlockingService service = TestRpcServiceProtos.CustomProto
@ -194,7 +194,7 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, CustomProtocol.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, CustomProtocol.class, ProtobufRpcEngine2.class);
UserGroupInformation.setConfiguration(conf);
ContainerManagementProtocol containerManager =