diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d52418fce02..551f8805f19 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -69,6 +69,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10355. Fix TestLoadGenerator#testLoadGenerator. (Haohui Mai via jing9) + HADOOP-10070. RPC client doesn't use per-connection conf to determine + server's expected Kerberos principal name. (atm) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 4cc0a1b6577..d7cadbe4303 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -542,8 +542,11 @@ public class Client { private synchronized AuthMethod setupSaslConnection(final InputStream in2, final OutputStream out2) throws IOException, InterruptedException { + // Do not use Client.conf here! We must use ConnectionId.conf, since the + // Client object is cached and shared between all RPC clients, even those + // for separate services. saslRpcClient = new SaslRpcClient(remoteId.getTicket(), - remoteId.getProtocol(), remoteId.getAddress(), conf); + remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf); return saslRpcClient.saslConnect(in2, out2); } @@ -1480,21 +1483,31 @@ public class Client { private final boolean doPing; //do we need to send ping message private final int pingInterval; // how often sends ping to the server in msecs private String saslQop; // here for testing + private final Configuration conf; // used to get the expected kerberos principal name ConnectionId(InetSocketAddress address, Class protocol, - UserGroupInformation ticket, int rpcTimeout, int maxIdleTime, - RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts, - boolean tcpNoDelay, boolean doPing, int pingInterval) { + UserGroupInformation ticket, int rpcTimeout, + RetryPolicy connectionRetryPolicy, Configuration conf) { this.protocol = protocol; this.address = address; this.ticket = ticket; this.rpcTimeout = rpcTimeout; - this.maxIdleTime = maxIdleTime; this.connectionRetryPolicy = connectionRetryPolicy; - this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts; - this.tcpNoDelay = tcpNoDelay; - this.doPing = doPing; - this.pingInterval = pingInterval; + + this.maxIdleTime = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT); + this.maxRetriesOnSocketTimeouts = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); + this.tcpNoDelay = conf.getBoolean( + CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT); + this.doPing = conf.getBoolean( + CommonConfigurationKeys.IPC_CLIENT_PING_KEY, + CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT); + this.pingInterval = (doPing ? Client.getPingInterval(conf) : 0); + this.conf = conf; } InetSocketAddress getAddress() { @@ -1572,19 +1585,8 @@ public class Client { max, retryInterval, TimeUnit.MILLISECONDS); } - boolean doPing = - conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); return new ConnectionId(addr, protocol, ticket, rpcTimeout, - conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT), - connectionRetryPolicy, - conf.getInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT), - conf.getBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT), - doPing, - (doPing ? Client.getPingInterval(conf) : 0)); + connectionRetryPolicy, conf); } static boolean isEqual(Object a, Object b) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java index dccd15dffb3..d0fb8fd1145 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java @@ -59,6 +59,9 @@ public class ClientCache { } else { client.incCount(); } + if (Client.LOG.isDebugEnabled()) { + Client.LOG.debug("getting client out of cache: " + client); + } return client; } @@ -90,13 +93,23 @@ public class ClientCache { * A RPC client is closed only when its reference count becomes zero. */ public void stopClient(Client client) { + if (Client.LOG.isDebugEnabled()) { + Client.LOG.debug("stopping client from cache: " + client); + } synchronized (this) { client.decCount(); if (client.isZeroReference()) { + if (Client.LOG.isDebugEnabled()) { + Client.LOG.debug("removing client from cache: " + client); + } clients.remove(client.getSocketFactory()); } } if (client.isZeroReference()) { + if (Client.LOG.isDebugEnabled()) { + Client.LOG.debug("stopping actual client because no more references remain: " + + client); + } client.stop(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index a37616abd56..5343737ec34 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -309,6 +309,10 @@ public class SaslRpcClient { // check that the server advertised principal matches our conf String confPrincipal = SecurityUtil.getServerPrincipal( conf.get(serverKey), serverAddr.getAddress()); + if (LOG.isDebugEnabled()) { + LOG.debug("getting serverKey: " + serverKey + " conf value: " + conf.get(serverKey) + + " principal: " + confPrincipal); + } if (confPrincipal == null || confPrincipal.isEmpty()) { throw new IllegalArgumentException( "Failed to specify server's Kerberos principal name");