From d3fa53a52beb066deac529906149f6066b629b07 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Thu, 17 Nov 2016 12:19:23 -0600 Subject: [PATCH] HADOOP-13742. Expose NumOpenConnectionsPerUser as a metric. Brahma Reddy Battula. (cherry picked from commit bd3735554fa5c3bc064c57ec78f4308430b14b48) --- .../java/org/apache/hadoop/ipc/Server.java | 61 ++++++++++++++++++- .../apache/hadoop/ipc/metrics/RpcMetrics.java | 5 ++ .../java/org/apache/hadoop/ipc/TestRPC.java | 29 ++++++++- .../apache/hadoop/test/MetricsAsserts.java | 7 +++ 4 files changed, 98 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 286943705fa..1b81b738cdb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -121,6 +121,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; +import org.codehaus.jackson.map.ObjectMapper; /** An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -2082,6 +2083,9 @@ public abstract class Server { authorizeConnection(); // don't set until after authz because connection isn't established connectionContextRead = true; + if (user != null) { + connectionManager.incrUserConnections(user.getShortUserName()); + } } /** @@ -2935,7 +2939,20 @@ public abstract class Server { public int getNumOpenConnections() { return connectionManager.size(); } - + + /** + * Get the NumOpenConnections/User. + */ + public String getNumOpenConnectionsPerUser() { + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper + .writeValueAsString(connectionManager.getUserToConnectionsMap()); + } catch (IOException ignored) { + } + return null; + } + /** * The number of rpc calls in the queue. * @return The number of rpc calls in the queue. @@ -3055,6 +3072,9 @@ public abstract class Server { private class ConnectionManager { final private AtomicInteger count = new AtomicInteger(); final private Set connections; + /* Map to maintain the statistics per User */ + final private Map userToConnectionsMap; + final private Object userToConnectionsMapLock = new Object(); final private Timer idleScanTimer; final private int idleScanThreshold; @@ -3086,6 +3106,7 @@ public abstract class Server { this.connections = Collections.newSetFromMap( new ConcurrentHashMap( maxQueueSize, 0.75f, readThreads+2)); + this.userToConnectionsMap = new ConcurrentHashMap<>(); } private boolean add(Connection connection) { @@ -3103,7 +3124,39 @@ public abstract class Server { } return removed; } - + + void incrUserConnections(String user) { + synchronized (userToConnectionsMapLock) { + Integer count = userToConnectionsMap.get(user); + if (count == null) { + count = 1; + } else { + count++; + } + userToConnectionsMap.put(user, count); + } + } + + void decrUserConnections(String user) { + synchronized (userToConnectionsMapLock) { + Integer count = userToConnectionsMap.get(user); + if (count == null) { + return; + } else { + count--; + } + if (count == 0) { + userToConnectionsMap.remove(user); + } else { + userToConnectionsMap.put(user, count); + } + } + } + + Map getUserToConnectionsMap() { + return userToConnectionsMap; + } + int size() { return count.get(); } @@ -3142,6 +3195,10 @@ public abstract class Server { // only close if actually removed to avoid double-closing due // to possible races connection.close(); + // Remove authorized users only + if (connection.user != null && connection.connectionContextRead) { + decrUserConnections(connection.user.getShortUserName()); + } } return exists; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index 5373f95e698..ef436189a20 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -104,6 +104,11 @@ public class RpcMetrics { return server.getNumOpenConnections(); } + @Metric("Number of open connections per user") + public String numOpenConnectionsPerUser() { + return server.getNumOpenConnectionsPerUser(); + } + @Metric("Length of the call queue") public int callQueueLength() { return server.getCallQueueLen(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 287f0e5dd68..29f5841c15b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -64,6 +64,7 @@ import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -1027,7 +1028,7 @@ public class TestRPC extends TestRpcBase { @Test public void testRpcMetrics() throws Exception { - Server server; + final Server server; TestRpcService proxy = null; final int interval = 1; @@ -1037,7 +1038,21 @@ public class TestRPC extends TestRpcBase { RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval); server = setupTestServer(conf, 5); - + String testUser = "testUser"; + UserGroupInformation anotherUser = + UserGroupInformation.createRemoteUser(testUser); + TestRpcService proxy2 = + anotherUser.doAs(new PrivilegedAction() { + public TestRpcService run() { + try { + return RPC.getProxy(TestRpcService.class, 0, + server.getListenerAddress(), conf); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }); try { proxy = getClient(addr, conf); @@ -1045,6 +1060,7 @@ public class TestRPC extends TestRpcBase { proxy.ping(null, newEmptyRequest()); proxy.echo(null, newEchoRequest("" + i)); + proxy2.echo(null, newEchoRequest("" + i)); } MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name()); @@ -1056,7 +1072,16 @@ public class TestRPC extends TestRpcBase { rpcMetrics); MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", rpcMetrics); + String actualUserVsCon = MetricsAsserts + .getStringMetric("NumOpenConnectionsPerUser", rpcMetrics); + String proxyUser = + UserGroupInformation.getCurrentUser().getShortUserName(); + assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1")); + assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1")); } finally { + if (proxy2 != null) { + RPC.stopProxy(proxy2); + } stop(server, proxy); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java index 982481e6081..5d87b0732f9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java @@ -236,6 +236,13 @@ public class MetricsAsserts { return captor.getValue(); } + public static String getStringMetric(String name, MetricsRecordBuilder rb) { + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture()); + checkCaptured(captor, name); + return captor.getValue(); + } + /** * Assert a float gauge metric as expected * @param name of the metric