HADOOP-10070. RPC client doesn't use per-connection conf to determine server's expected Kerberos principal name. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1570777 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
34f06451f9
commit
bd10814f31
|
@ -69,6 +69,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
|
|
||||||
HADOOP-10355. Fix TestLoadGenerator#testLoadGenerator. (Haohui Mai via jing9)
|
HADOOP-10355. Fix TestLoadGenerator#testLoadGenerator. (Haohui Mai via jing9)
|
||||||
|
|
||||||
|
HADOOP-10070. RPC client doesn't use per-connection conf to determine
|
||||||
|
server's expected Kerberos principal name. (atm)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -542,8 +542,11 @@ public class Client {
|
||||||
|
|
||||||
private synchronized AuthMethod setupSaslConnection(final InputStream in2,
|
private synchronized AuthMethod setupSaslConnection(final InputStream in2,
|
||||||
final OutputStream out2) throws IOException, InterruptedException {
|
final OutputStream out2) throws IOException, InterruptedException {
|
||||||
|
// Do not use Client.conf here! We must use ConnectionId.conf, since the
|
||||||
|
// Client object is cached and shared between all RPC clients, even those
|
||||||
|
// for separate services.
|
||||||
saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
|
saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
|
||||||
remoteId.getProtocol(), remoteId.getAddress(), conf);
|
remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf);
|
||||||
return saslRpcClient.saslConnect(in2, out2);
|
return saslRpcClient.saslConnect(in2, out2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1480,21 +1483,31 @@ public class Client {
|
||||||
private final boolean doPing; //do we need to send ping message
|
private final boolean doPing; //do we need to send ping message
|
||||||
private final int pingInterval; // how often sends ping to the server in msecs
|
private final int pingInterval; // how often sends ping to the server in msecs
|
||||||
private String saslQop; // here for testing
|
private String saslQop; // here for testing
|
||||||
|
private final Configuration conf; // used to get the expected kerberos principal name
|
||||||
|
|
||||||
ConnectionId(InetSocketAddress address, Class<?> protocol,
|
ConnectionId(InetSocketAddress address, Class<?> protocol,
|
||||||
UserGroupInformation ticket, int rpcTimeout, int maxIdleTime,
|
UserGroupInformation ticket, int rpcTimeout,
|
||||||
RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts,
|
RetryPolicy connectionRetryPolicy, Configuration conf) {
|
||||||
boolean tcpNoDelay, boolean doPing, int pingInterval) {
|
|
||||||
this.protocol = protocol;
|
this.protocol = protocol;
|
||||||
this.address = address;
|
this.address = address;
|
||||||
this.ticket = ticket;
|
this.ticket = ticket;
|
||||||
this.rpcTimeout = rpcTimeout;
|
this.rpcTimeout = rpcTimeout;
|
||||||
this.maxIdleTime = maxIdleTime;
|
|
||||||
this.connectionRetryPolicy = connectionRetryPolicy;
|
this.connectionRetryPolicy = connectionRetryPolicy;
|
||||||
this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts;
|
|
||||||
this.tcpNoDelay = tcpNoDelay;
|
this.maxIdleTime = conf.getInt(
|
||||||
this.doPing = doPing;
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
||||||
this.pingInterval = pingInterval;
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
|
||||||
|
this.maxRetriesOnSocketTimeouts = conf.getInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
|
||||||
|
this.tcpNoDelay = conf.getBoolean(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT);
|
||||||
|
this.doPing = conf.getBoolean(
|
||||||
|
CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
|
||||||
|
CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT);
|
||||||
|
this.pingInterval = (doPing ? Client.getPingInterval(conf) : 0);
|
||||||
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
InetSocketAddress getAddress() {
|
InetSocketAddress getAddress() {
|
||||||
|
@ -1572,19 +1585,8 @@ public class Client {
|
||||||
max, retryInterval, TimeUnit.MILLISECONDS);
|
max, retryInterval, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean doPing =
|
|
||||||
conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
|
|
||||||
return new ConnectionId(addr, protocol, ticket, rpcTimeout,
|
return new ConnectionId(addr, protocol, ticket, rpcTimeout,
|
||||||
conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
connectionRetryPolicy, conf);
|
||||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT),
|
|
||||||
connectionRetryPolicy,
|
|
||||||
conf.getInt(
|
|
||||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
|
||||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT),
|
|
||||||
conf.getBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
|
|
||||||
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT),
|
|
||||||
doPing,
|
|
||||||
(doPing ? Client.getPingInterval(conf) : 0));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isEqual(Object a, Object b) {
|
static boolean isEqual(Object a, Object b) {
|
||||||
|
|
|
@ -59,6 +59,9 @@ public class ClientCache {
|
||||||
} else {
|
} else {
|
||||||
client.incCount();
|
client.incCount();
|
||||||
}
|
}
|
||||||
|
if (Client.LOG.isDebugEnabled()) {
|
||||||
|
Client.LOG.debug("getting client out of cache: " + client);
|
||||||
|
}
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,13 +93,23 @@ public class ClientCache {
|
||||||
* A RPC client is closed only when its reference count becomes zero.
|
* A RPC client is closed only when its reference count becomes zero.
|
||||||
*/
|
*/
|
||||||
public void stopClient(Client client) {
|
public void stopClient(Client client) {
|
||||||
|
if (Client.LOG.isDebugEnabled()) {
|
||||||
|
Client.LOG.debug("stopping client from cache: " + client);
|
||||||
|
}
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
client.decCount();
|
client.decCount();
|
||||||
if (client.isZeroReference()) {
|
if (client.isZeroReference()) {
|
||||||
|
if (Client.LOG.isDebugEnabled()) {
|
||||||
|
Client.LOG.debug("removing client from cache: " + client);
|
||||||
|
}
|
||||||
clients.remove(client.getSocketFactory());
|
clients.remove(client.getSocketFactory());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (client.isZeroReference()) {
|
if (client.isZeroReference()) {
|
||||||
|
if (Client.LOG.isDebugEnabled()) {
|
||||||
|
Client.LOG.debug("stopping actual client because no more references remain: "
|
||||||
|
+ client);
|
||||||
|
}
|
||||||
client.stop();
|
client.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -309,6 +309,10 @@ public class SaslRpcClient {
|
||||||
// check that the server advertised principal matches our conf
|
// check that the server advertised principal matches our conf
|
||||||
String confPrincipal = SecurityUtil.getServerPrincipal(
|
String confPrincipal = SecurityUtil.getServerPrincipal(
|
||||||
conf.get(serverKey), serverAddr.getAddress());
|
conf.get(serverKey), serverAddr.getAddress());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("getting serverKey: " + serverKey + " conf value: " + conf.get(serverKey)
|
||||||
|
+ " principal: " + confPrincipal);
|
||||||
|
}
|
||||||
if (confPrincipal == null || confPrincipal.isEmpty()) {
|
if (confPrincipal == null || confPrincipal.isEmpty()) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Failed to specify server's Kerberos principal name");
|
"Failed to specify server's Kerberos principal name");
|
||||||
|
|
Loading…
Reference in New Issue