HADOOP-13742. Expose NumOpenConnectionsPerUser as a metric. Brahma Reddy Battula.

(cherry picked from commit bd3735554f)
This commit is contained in:
Kihwal Lee 2016-11-17 12:19:23 -06:00
parent 3e5a85f35e
commit d3fa53a52b
4 changed files with 98 additions and 4 deletions

View File

@ -121,6 +121,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream; import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import org.codehaus.jackson.map.ObjectMapper;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a /** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on * parameter, and return a {@link Writable} as their value. A service runs on
@ -2082,6 +2083,9 @@ public abstract class Server {
authorizeConnection(); authorizeConnection();
// don't set until after authz because connection isn't established // don't set until after authz because connection isn't established
connectionContextRead = true; connectionContextRead = true;
if (user != null) {
connectionManager.incrUserConnections(user.getShortUserName());
}
} }
/** /**
@ -2935,7 +2939,20 @@ public abstract class Server {
public int getNumOpenConnections() { public int getNumOpenConnections() {
return connectionManager.size(); return connectionManager.size();
} }
/**
* Get the NumOpenConnections/User.
*/
public String getNumOpenConnectionsPerUser() {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper
.writeValueAsString(connectionManager.getUserToConnectionsMap());
} catch (IOException ignored) {
}
return null;
}
/** /**
* The number of rpc calls in the queue. * The number of rpc calls in the queue.
* @return The number of rpc calls in the queue. * @return The number of rpc calls in the queue.
@ -3055,6 +3072,9 @@ public abstract class Server {
private class ConnectionManager { private class ConnectionManager {
final private AtomicInteger count = new AtomicInteger(); final private AtomicInteger count = new AtomicInteger();
final private Set<Connection> connections; final private Set<Connection> connections;
/* Map to maintain the statistics per User */
final private Map<String, Integer> userToConnectionsMap;
final private Object userToConnectionsMapLock = new Object();
final private Timer idleScanTimer; final private Timer idleScanTimer;
final private int idleScanThreshold; final private int idleScanThreshold;
@ -3086,6 +3106,7 @@ public abstract class Server {
this.connections = Collections.newSetFromMap( this.connections = Collections.newSetFromMap(
new ConcurrentHashMap<Connection,Boolean>( new ConcurrentHashMap<Connection,Boolean>(
maxQueueSize, 0.75f, readThreads+2)); maxQueueSize, 0.75f, readThreads+2));
this.userToConnectionsMap = new ConcurrentHashMap<>();
} }
private boolean add(Connection connection) { private boolean add(Connection connection) {
@ -3103,7 +3124,39 @@ public abstract class Server {
} }
return removed; return removed;
} }
void incrUserConnections(String user) {
synchronized (userToConnectionsMapLock) {
Integer count = userToConnectionsMap.get(user);
if (count == null) {
count = 1;
} else {
count++;
}
userToConnectionsMap.put(user, count);
}
}
void decrUserConnections(String user) {
synchronized (userToConnectionsMapLock) {
Integer count = userToConnectionsMap.get(user);
if (count == null) {
return;
} else {
count--;
}
if (count == 0) {
userToConnectionsMap.remove(user);
} else {
userToConnectionsMap.put(user, count);
}
}
}
Map<String, Integer> getUserToConnectionsMap() {
return userToConnectionsMap;
}
int size() { int size() {
return count.get(); return count.get();
} }
@ -3142,6 +3195,10 @@ public abstract class Server {
// only close if actually removed to avoid double-closing due // only close if actually removed to avoid double-closing due
// to possible races // to possible races
connection.close(); connection.close();
// Remove authorized users only
if (connection.user != null && connection.connectionContextRead) {
decrUserConnections(connection.user.getShortUserName());
}
} }
return exists; return exists;
} }

View File

@ -104,6 +104,11 @@ public class RpcMetrics {
return server.getNumOpenConnections(); return server.getNumOpenConnections();
} }
@Metric("Number of open connections per user")
public String numOpenConnectionsPerUser() {
return server.getNumOpenConnectionsPerUser();
}
@Metric("Length of the call queue") public int callQueueLength() { @Metric("Length of the call queue") public int callQueueLength() {
return server.getCallQueueLen(); return server.getCallQueueLen();
} }

View File

@ -64,6 +64,7 @@ import java.net.ConnectException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -1027,7 +1028,7 @@ public class TestRPC extends TestRpcBase {
@Test @Test
public void testRpcMetrics() throws Exception { public void testRpcMetrics() throws Exception {
Server server; final Server server;
TestRpcService proxy = null; TestRpcService proxy = null;
final int interval = 1; final int interval = 1;
@ -1037,7 +1038,21 @@ public class TestRPC extends TestRpcBase {
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval); RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
server = setupTestServer(conf, 5); server = setupTestServer(conf, 5);
String testUser = "testUser";
UserGroupInformation anotherUser =
UserGroupInformation.createRemoteUser(testUser);
TestRpcService proxy2 =
anotherUser.doAs(new PrivilegedAction<TestRpcService>() {
public TestRpcService run() {
try {
return RPC.getProxy(TestRpcService.class, 0,
server.getListenerAddress(), conf);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
});
try { try {
proxy = getClient(addr, conf); proxy = getClient(addr, conf);
@ -1045,6 +1060,7 @@ public class TestRPC extends TestRpcBase {
proxy.ping(null, newEmptyRequest()); proxy.ping(null, newEmptyRequest());
proxy.echo(null, newEchoRequest("" + i)); proxy.echo(null, newEchoRequest("" + i));
proxy2.echo(null, newEchoRequest("" + i));
} }
MetricsRecordBuilder rpcMetrics = MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name()); getMetrics(server.getRpcMetrics().name());
@ -1056,7 +1072,16 @@ public class TestRPC extends TestRpcBase {
rpcMetrics); rpcMetrics);
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
rpcMetrics); rpcMetrics);
String actualUserVsCon = MetricsAsserts
.getStringMetric("NumOpenConnectionsPerUser", rpcMetrics);
String proxyUser =
UserGroupInformation.getCurrentUser().getShortUserName();
assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
} finally { } finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
}
stop(server, proxy); stop(server, proxy);
} }
} }

View File

@ -236,6 +236,13 @@ public class MetricsAsserts {
return captor.getValue(); return captor.getValue();
} }
public static String getStringMetric(String name, MetricsRecordBuilder rb) {
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());
checkCaptured(captor, name);
return captor.getValue();
}
/** /**
* Assert a float gauge metric as expected * Assert a float gauge metric as expected
* @param name of the metric * @param name of the metric