diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 1c7e76a5068..8f1956e4b4d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -122,6 +122,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; +import org.codehaus.jackson.map.ObjectMapper; /** An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -2151,6 +2152,9 @@ private void processConnectionContext(RpcWritable.Buffer buffer) authorizeConnection(); // don't set until after authz because connection isn't established connectionContextRead = true; + if (user != null) { + connectionManager.incrUserConnections(user.getShortUserName()); + } } /** @@ -3019,7 +3023,20 @@ public int getPort() { public int getNumOpenConnections() { return connectionManager.size(); } - + + /** + * Get the NumOpenConnections/User. + */ + public String getNumOpenConnectionsPerUser() { + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper + .writeValueAsString(connectionManager.getUserToConnectionsMap()); + } catch (IOException ignored) { + } + return null; + } + /** * The number of rpc calls in the queue. * @return The number of rpc calls in the queue. @@ -3139,6 +3156,9 @@ private static int channelIO(ReadableByteChannel readCh, private class ConnectionManager { final private AtomicInteger count = new AtomicInteger(); final private Set connections; + /* Map to maintain the statistics per User */ + final private Map userToConnectionsMap; + final private Object userToConnectionsMapLock = new Object(); final private Timer idleScanTimer; final private int idleScanThreshold; @@ -3170,6 +3190,7 @@ private class ConnectionManager { this.connections = Collections.newSetFromMap( new ConcurrentHashMap( maxQueueSize, 0.75f, readThreads+2)); + this.userToConnectionsMap = new ConcurrentHashMap<>(); } private boolean add(Connection connection) { @@ -3187,7 +3208,39 @@ private boolean remove(Connection connection) { } return removed; } - + + void incrUserConnections(String user) { + synchronized (userToConnectionsMapLock) { + Integer count = userToConnectionsMap.get(user); + if (count == null) { + count = 1; + } else { + count++; + } + userToConnectionsMap.put(user, count); + } + } + + void decrUserConnections(String user) { + synchronized (userToConnectionsMapLock) { + Integer count = userToConnectionsMap.get(user); + if (count == null) { + return; + } else { + count--; + } + if (count == 0) { + userToConnectionsMap.remove(user); + } else { + userToConnectionsMap.put(user, count); + } + } + } + + Map getUserToConnectionsMap() { + return userToConnectionsMap; + } + int size() { return count.get(); } @@ -3226,6 +3279,10 @@ boolean close(Connection connection) { // only close if actually removed to avoid double-closing due // to possible races connection.close(); + // Remove authorized users only + if (connection.user != null && connection.connectionContextRead) { + decrUserConnections(connection.user.getShortUserName()); + } } return exists; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index 5373f95e698..ef436189a20 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -104,6 +104,11 @@ public static RpcMetrics create(Server server, Configuration conf) { return server.getNumOpenConnections(); } + @Metric("Number of open connections per user") + public String numOpenConnectionsPerUser() { + return server.getNumOpenConnectionsPerUser(); + } + @Metric("Length of the call queue") public int callQueueLength() { return server.getCallQueueLen(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 72b603aa29b..f0d883b54c6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -64,6 +64,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -1015,7 +1016,7 @@ public UserGroupInformation getRemoteUser() { @Test public void testRpcMetrics() throws Exception { - Server server; + final Server server; TestRpcService proxy = null; final int interval = 1; @@ -1025,7 +1026,21 @@ public void testRpcMetrics() throws Exception { RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval); server = setupTestServer(conf, 5); - + String testUser = "testUser"; + UserGroupInformation anotherUser = + UserGroupInformation.createRemoteUser(testUser); + TestRpcService proxy2 = + anotherUser.doAs(new PrivilegedAction() { + public TestRpcService run() { + try { + return RPC.getProxy(TestRpcService.class, 0, + server.getListenerAddress(), conf); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }); try { proxy = getClient(addr, conf); @@ -1033,6 +1048,7 @@ public void testRpcMetrics() throws Exception { proxy.ping(null, newEmptyRequest()); proxy.echo(null, newEchoRequest("" + i)); + proxy2.echo(null, newEchoRequest("" + i)); } MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name()); @@ -1044,7 +1060,16 @@ public void testRpcMetrics() throws Exception { rpcMetrics); MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", rpcMetrics); + String actualUserVsCon = MetricsAsserts + .getStringMetric("NumOpenConnectionsPerUser", rpcMetrics); + String proxyUser = + UserGroupInformation.getCurrentUser().getShortUserName(); + assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1")); + assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1")); } finally { + if (proxy2 != null) { + RPC.stopProxy(proxy2); + } stop(server, proxy); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java index 982481e6081..5d87b0732f9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java @@ -236,6 +236,13 @@ public static long getLongCounter(String name, MetricsRecordBuilder rb) { return captor.getValue(); } + public static String getStringMetric(String name, MetricsRecordBuilder rb) { + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture()); + checkCaptured(captor, name); + return captor.getValue(); + } + /** * Assert a float gauge metric as expected * @param name of the metric