svn merge -c 1236901 from trunk for MAPREDUCE-3740.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1237550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-30 07:15:33 +00:00
parent a821a14adc
commit b92bd6aed4
2 changed files with 23 additions and 2 deletions

View File

@ -18,6 +18,9 @@ Release 0.23-PB - Unreleased
Move the support for multiple protocols to lower layer so that Writable, Move the support for multiple protocols to lower layer so that Writable,
PB and Avro can all use it (Sanjay) PB and Avro can all use it (Sanjay)
MAPREDUCE-3740. Fixed broken mapreduce compilation after the patch for
HADOOP-7965. (Devaraj K via vinodkv)
Release 0.23.1 - Unreleased Release 0.23.1 - Unreleased
NEW FEATURES NEW FEATURES

View File

@ -36,10 +36,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtocolMetaInfoPB;
import org.apache.hadoop.ipc.ProtocolProxy; import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcEngine; import org.apache.hadoop.ipc.RpcEngine;
import org.apache.hadoop.ipc.ClientCache; import org.apache.hadoop.ipc.ClientCache;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
@ -73,6 +75,17 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
addr, ticket, conf, factory, rpcTimeout)), false); addr, ticket, conf, factory, rpcTimeout)), false);
} }
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
factory)), false);
}
private static class Invoker implements InvocationHandler, Closeable { private static class Invoker implements InvocationHandler, Closeable {
private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>(); private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
private boolean isClosed = false; private boolean isClosed = false;
@ -82,8 +95,13 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
public Invoker(Class<?> protocol, InetSocketAddress addr, public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory, UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException { int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol, this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf); ticket, rpcTimeout, conf), conf, factory);
}
public Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, this.client = CLIENTS.getClient(conf, factory,
ProtoSpecificResponseWritable.class); ProtoSpecificResponseWritable.class);
} }