diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d9c930e8090..707c90e99c9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -24,6 +24,9 @@ Release 2.7.4 - UNRELEASED HADOOP-13655. document object store use with fs shell and distcp. (Steve Loughran via Mingliang Liu) + HADOOP-13742. Expose "NumOpenConnectionsPerUser" as a metric. + (Brahma Reddy Battula via kihwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index d0e5022685f..487fd217573 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -127,6 +127,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; +import org.codehaus.jackson.map.ObjectMapper; /** An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -1837,6 +1838,9 @@ public abstract class Server { authorizeConnection(); // don't set until after authz because connection isn't established connectionContextRead = true; + if (user != null) { + connectionManager.incrUserConnections(user.getShortUserName()); + } } /** @@ -2690,7 +2694,20 @@ public abstract class Server { public int getNumOpenConnections() { return connectionManager.size(); } - + + /** + * Get the NumOpenConnections/User. + */ + public String getNumOpenConnectionsPerUser() { + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper + .writeValueAsString(connectionManager.getUserToConnectionsMap()); + } catch (IOException ignored) { + } + return null; + } + /** * The number of rpc calls in the queue. * @return The number of rpc calls in the queue. @@ -2802,6 +2819,9 @@ public abstract class Server { private class ConnectionManager { final private AtomicInteger count = new AtomicInteger(); final private Set connections; + /* Map to maintain the statistics per User */ + final private Map userToConnectionsMap; + final private Object userToConnectionsMapLock = new Object(); final private Timer idleScanTimer; final private int idleScanThreshold; @@ -2833,6 +2853,7 @@ public abstract class Server { this.connections = Collections.newSetFromMap( new ConcurrentHashMap( maxQueueSize, 0.75f, readThreads+2)); + this.userToConnectionsMap = new ConcurrentHashMap<>(); } private boolean add(Connection connection) { @@ -2850,7 +2871,39 @@ public abstract class Server { } return removed; } - + + void incrUserConnections(String user) { + synchronized (userToConnectionsMapLock) { + Integer count = userToConnectionsMap.get(user); + if (count == null) { + count = 1; + } else { + count++; + } + userToConnectionsMap.put(user, count); + } + } + + void decrUserConnections(String user) { + synchronized (userToConnectionsMapLock) { + Integer count = userToConnectionsMap.get(user); + if (count == null) { + return; + } else { + count--; + } + if (count == 0) { + userToConnectionsMap.remove(user); + } else { + userToConnectionsMap.put(user, count); + } + } + } + + Map getUserToConnectionsMap() { + return userToConnectionsMap; + } + int size() { return count.get(); } @@ -2889,6 +2942,10 @@ public abstract class Server { // only close if actually removed to avoid double-closing due // to possible races connection.close(); + // Remove authorized users only + if (connection.user != null && connection.connectionContextRead) { + decrUserConnections(connection.user.getShortUserName()); + } } return exists; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index 07968e86dad..2a0e127cd7b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -104,6 +104,11 @@ public class RpcMetrics { return server.getNumOpenConnections(); } + @Metric("Number of open connections per user") + public String numOpenConnectionsPerUser() { + return server.getNumOpenConnectionsPerUser(); + } + @Metric("Length of the call queue") public int callQueueLength() { return server.getCallQueueLen(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index ce0ca2fbc0a..e4cf76c2589 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -40,6 +40,7 @@ import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -995,12 +996,28 @@ public class TestRPC { .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) .build(); server.start(); + String testUser = "testUser"; + UserGroupInformation anotherUser = + UserGroupInformation.createRemoteUser(testUser); + TestProtocol proxy2 = + anotherUser.doAs(new PrivilegedAction() { + public TestProtocol run() { + try { + return RPC.getProxy(TestProtocol.class, 0, + server.getListenerAddress(), conf); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }); final TestProtocol proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, server.getListenerAddress(), configuration); try { for (int i=0; i<1000; i++) { proxy.ping(); proxy.echo("" + i); + proxy2.echo("" + i); } MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name()); @@ -1012,10 +1029,19 @@ public class TestRPC { rpcMetrics); MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", rpcMetrics); + String actualUserVsCon = MetricsAsserts + .getStringMetric("NumOpenConnectionsPerUser", rpcMetrics); + String proxyUser = + UserGroupInformation.getCurrentUser().getShortUserName(); + assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1")); + assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1")); } finally { if (proxy != null) { RPC.stopProxy(proxy); } + if (proxy2 != null) { + RPC.stopProxy(proxy2); + } server.stop(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java index 982481e6081..5d87b0732f9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java @@ -236,6 +236,13 @@ public class MetricsAsserts { return captor.getValue(); } + public static String getStringMetric(String name, MetricsRecordBuilder rb) { + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture()); + checkCaptured(captor, name); + return captor.getValue(); + } + /** * Assert a float gauge metric as expected * @param name of the metric