HADOOP-13742. Expose NumOpenConnectionsPerUser as a metric. Brahma Reddy Battula.

This commit is contained in:
Kihwal Lee 2016-11-17 12:16:38 -06:00
parent b2d4b7b1b9
commit bd3735554f
4 changed files with 98 additions and 4 deletions

View File

@ -122,6 +122,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import org.codehaus.jackson.map.ObjectMapper;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@ -2151,6 +2152,9 @@ private void processConnectionContext(RpcWritable.Buffer buffer)
authorizeConnection();
// don't set until after authz because connection isn't established
connectionContextRead = true;
if (user != null) {
connectionManager.incrUserConnections(user.getShortUserName());
}
}
/**
@ -3019,7 +3023,20 @@ public int getPort() {
public int getNumOpenConnections() {
return connectionManager.size();
}
/**
* Get the NumOpenConnections/User.
*/
public String getNumOpenConnectionsPerUser() {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper
.writeValueAsString(connectionManager.getUserToConnectionsMap());
} catch (IOException ignored) {
}
return null;
}
/**
* The number of rpc calls in the queue.
* @return The number of rpc calls in the queue.
@ -3139,6 +3156,9 @@ private static int channelIO(ReadableByteChannel readCh,
private class ConnectionManager {
final private AtomicInteger count = new AtomicInteger();
final private Set<Connection> connections;
/* Map to maintain the statistics per User */
final private Map<String, Integer> userToConnectionsMap;
final private Object userToConnectionsMapLock = new Object();
final private Timer idleScanTimer;
final private int idleScanThreshold;
@ -3170,6 +3190,7 @@ private class ConnectionManager {
this.connections = Collections.newSetFromMap(
new ConcurrentHashMap<Connection,Boolean>(
maxQueueSize, 0.75f, readThreads+2));
this.userToConnectionsMap = new ConcurrentHashMap<>();
}
private boolean add(Connection connection) {
@ -3187,7 +3208,39 @@ private boolean remove(Connection connection) {
}
return removed;
}
void incrUserConnections(String user) {
synchronized (userToConnectionsMapLock) {
Integer count = userToConnectionsMap.get(user);
if (count == null) {
count = 1;
} else {
count++;
}
userToConnectionsMap.put(user, count);
}
}
void decrUserConnections(String user) {
synchronized (userToConnectionsMapLock) {
Integer count = userToConnectionsMap.get(user);
if (count == null) {
return;
} else {
count--;
}
if (count == 0) {
userToConnectionsMap.remove(user);
} else {
userToConnectionsMap.put(user, count);
}
}
}
Map<String, Integer> getUserToConnectionsMap() {
return userToConnectionsMap;
}
int size() {
return count.get();
}
@ -3226,6 +3279,10 @@ boolean close(Connection connection) {
// only close if actually removed to avoid double-closing due
// to possible races
connection.close();
// Remove authorized users only
if (connection.user != null && connection.connectionContextRead) {
decrUserConnections(connection.user.getShortUserName());
}
}
return exists;
}

View File

@ -104,6 +104,11 @@ public static RpcMetrics create(Server server, Configuration conf) {
return server.getNumOpenConnections();
}
@Metric("Number of open connections per user")
public String numOpenConnectionsPerUser() {
return server.getNumOpenConnectionsPerUser();
}
@Metric("Length of the call queue") public int callQueueLength() {
return server.getCallQueueLen();
}

View File

@ -64,6 +64,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -1015,7 +1016,7 @@ public UserGroupInformation getRemoteUser() {
@Test
public void testRpcMetrics() throws Exception {
Server server;
final Server server;
TestRpcService proxy = null;
final int interval = 1;
@ -1025,7 +1026,21 @@ public void testRpcMetrics() throws Exception {
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
server = setupTestServer(conf, 5);
String testUser = "testUser";
UserGroupInformation anotherUser =
UserGroupInformation.createRemoteUser(testUser);
TestRpcService proxy2 =
anotherUser.doAs(new PrivilegedAction<TestRpcService>() {
public TestRpcService run() {
try {
return RPC.getProxy(TestRpcService.class, 0,
server.getListenerAddress(), conf);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
});
try {
proxy = getClient(addr, conf);
@ -1033,6 +1048,7 @@ public void testRpcMetrics() throws Exception {
proxy.ping(null, newEmptyRequest());
proxy.echo(null, newEchoRequest("" + i));
proxy2.echo(null, newEchoRequest("" + i));
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
@ -1044,7 +1060,16 @@ public void testRpcMetrics() throws Exception {
rpcMetrics);
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
rpcMetrics);
String actualUserVsCon = MetricsAsserts
.getStringMetric("NumOpenConnectionsPerUser", rpcMetrics);
String proxyUser =
UserGroupInformation.getCurrentUser().getShortUserName();
assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
}
stop(server, proxy);
}
}

View File

@ -236,6 +236,13 @@ public static long getLongCounter(String name, MetricsRecordBuilder rb) {
return captor.getValue();
}
public static String getStringMetric(String name, MetricsRecordBuilder rb) {
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());
checkCaptured(captor, name);
return captor.getValue();
}
/**
* Assert a float gauge metric as expected
* @param name of the metric