HADOOP-13742. Expose NumOpenConnectionsPerUser as a metric. Brahma Reddy Battula.

(cherry picked from commit bd3735554f)
(cherry picked from commit 0fb1306f93)
This commit is contained in:
Kihwal Lee 2016-11-17 12:20:01 -06:00 committed by Zhe Zhang
parent c594904eca
commit ba10a055e4
5 changed files with 100 additions and 2 deletions

View File

@ -24,6 +24,9 @@ Release 2.7.4 - UNRELEASED
HADOOP-13655. document object store use with fs shell and distcp.
(Steve Loughran via Mingliang Liu)
HADOOP-13742. Expose "NumOpenConnectionsPerUser" as a metric.
(Brahma Reddy Battula via kihwal)
OPTIMIZATIONS
BUG FIXES

View File

@ -127,6 +127,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import org.codehaus.jackson.map.ObjectMapper;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@ -1837,6 +1838,9 @@ public abstract class Server {
authorizeConnection();
// don't set until after authz because connection isn't established
connectionContextRead = true;
if (user != null) {
connectionManager.incrUserConnections(user.getShortUserName());
}
}
/**
@ -2690,7 +2694,20 @@ public abstract class Server {
public int getNumOpenConnections() {
return connectionManager.size();
}
/**
* Get the NumOpenConnections/User.
*/
public String getNumOpenConnectionsPerUser() {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper
.writeValueAsString(connectionManager.getUserToConnectionsMap());
} catch (IOException ignored) {
}
return null;
}
/**
* The number of rpc calls in the queue.
* @return The number of rpc calls in the queue.
@ -2802,6 +2819,9 @@ public abstract class Server {
private class ConnectionManager {
final private AtomicInteger count = new AtomicInteger();
final private Set<Connection> connections;
/* Map to maintain the statistics per User */
final private Map<String, Integer> userToConnectionsMap;
final private Object userToConnectionsMapLock = new Object();
final private Timer idleScanTimer;
final private int idleScanThreshold;
@ -2833,6 +2853,7 @@ public abstract class Server {
this.connections = Collections.newSetFromMap(
new ConcurrentHashMap<Connection,Boolean>(
maxQueueSize, 0.75f, readThreads+2));
this.userToConnectionsMap = new ConcurrentHashMap<>();
}
private boolean add(Connection connection) {
@ -2850,7 +2871,39 @@ public abstract class Server {
}
return removed;
}
void incrUserConnections(String user) {
synchronized (userToConnectionsMapLock) {
Integer count = userToConnectionsMap.get(user);
if (count == null) {
count = 1;
} else {
count++;
}
userToConnectionsMap.put(user, count);
}
}
void decrUserConnections(String user) {
synchronized (userToConnectionsMapLock) {
Integer count = userToConnectionsMap.get(user);
if (count == null) {
return;
} else {
count--;
}
if (count == 0) {
userToConnectionsMap.remove(user);
} else {
userToConnectionsMap.put(user, count);
}
}
}
Map<String, Integer> getUserToConnectionsMap() {
return userToConnectionsMap;
}
int size() {
return count.get();
}
@ -2889,6 +2942,10 @@ public abstract class Server {
// only close if actually removed to avoid double-closing due
// to possible races
connection.close();
// Remove authorized users only
if (connection.user != null && connection.connectionContextRead) {
decrUserConnections(connection.user.getShortUserName());
}
}
return exists;
}

View File

@ -104,6 +104,11 @@ public class RpcMetrics {
return server.getNumOpenConnections();
}
@Metric("Number of open connections per user")
public String numOpenConnectionsPerUser() {
return server.getNumOpenConnectionsPerUser();
}
@Metric("Length of the call queue") public int callQueueLength() {
return server.getCallQueueLen();
}

View File

@ -40,6 +40,7 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -995,12 +996,28 @@ public class TestRPC {
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.build();
server.start();
String testUser = "testUser";
UserGroupInformation anotherUser =
UserGroupInformation.createRemoteUser(testUser);
TestProtocol proxy2 =
anotherUser.doAs(new PrivilegedAction<TestProtocol>() {
public TestProtocol run() {
try {
return RPC.getProxy(TestProtocol.class, 0,
server.getListenerAddress(), conf);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
});
final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, server.getListenerAddress(), configuration);
try {
for (int i=0; i<1000; i++) {
proxy.ping();
proxy.echo("" + i);
proxy2.echo("" + i);
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
@ -1012,10 +1029,19 @@ public class TestRPC {
rpcMetrics);
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
rpcMetrics);
String actualUserVsCon = MetricsAsserts
.getStringMetric("NumOpenConnectionsPerUser", rpcMetrics);
String proxyUser =
UserGroupInformation.getCurrentUser().getShortUserName();
assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
} finally {
if (proxy != null) {
RPC.stopProxy(proxy);
}
if (proxy2 != null) {
RPC.stopProxy(proxy2);
}
server.stop();
}
}

View File

@ -236,6 +236,13 @@ public class MetricsAsserts {
return captor.getValue();
}
public static String getStringMetric(String name, MetricsRecordBuilder rb) {
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());
checkCaptured(captor, name);
return captor.getValue();
}
/**
* Assert a float gauge metric as expected
* @param name of the metric