HADOOP-14440. Add metrics for connections dropped. Contributed by Eric Badger.

This commit is contained in:
Brahma Reddy Battula 2017-06-06 00:21:03 +08:00
parent 46f7e91980
commit abdd609e51
4 changed files with 29 additions and 4 deletions

View File

@ -64,6 +64,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
@ -1220,6 +1221,7 @@ public abstract class Server {
if (channel.isOpen()) {
IOUtils.cleanup(null, channel);
}
connectionManager.droppedConnections.getAndIncrement();
continue;
}
key.attach(c); // so closeCurrentConnection can get the object
@ -3160,6 +3162,16 @@ public abstract class Server {
return null;
}
/**
* The number of RPC connections dropped due to
* too many connections.
* @return the number of dropped rpc connections
*/
public long getNumDroppedConnections() {
return connectionManager.getDroppedConnections();
}
/**
* The number of rpc calls in the queue.
* @return The number of rpc calls in the queue.
@ -3278,6 +3290,7 @@ public abstract class Server {
private class ConnectionManager {
final private AtomicInteger count = new AtomicInteger();
final private AtomicLong droppedConnections = new AtomicLong();
final private Set<Connection> connections;
/* Map to maintain the statistics per User */
final private Map<String, Integer> userToConnectionsMap;
@ -3364,6 +3377,11 @@ public abstract class Server {
return userToConnectionsMap;
}
long getDroppedConnections() {
return droppedConnections.get();
}
int size() {
return count.get();
}

View File

@ -121,6 +121,10 @@ public class RpcMetrics {
return server.getCallQueueLen();
}
@Metric("Number of dropped connections") public long numDroppedConnections() {
return server.getNumDroppedConnections();
}
// Public instrumentation methods that could be extracted to an
// abstract class if we decide to do custom instrumentation classes a la
// JobTrackerInstrumentation. The methods with //@Override comment are

View File

@ -79,6 +79,7 @@ Each metrics record contains tags such as Hostname and port (number to which ser
| `RpcAuthorizationSuccesses` | Total number of authorization successes |
| `NumOpenConnections` | Current number of open connections |
| `CallQueueLength` | Current length of the call queue |
| `numDroppedConnections` | Total number of dropped connections |
| `rpcQueueTime`*num*`sNumOps` | Shows total number of RPC calls (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
| `rpcQueueTime`*num*`s50thPercentileLatency` | Shows the 50th percentile of RPC queue time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
| `rpcQueueTime`*num*`s75thPercentileLatency` | Shows the 75th percentile of RPC queue time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |

View File

@ -1339,7 +1339,7 @@ public class TestIPC {
@Test
public void testMaxConnections() throws Exception {
conf.setInt("ipc.server.max.connections", 5);
conf.setInt("ipc.server.max.connections", 6);
Server server = null;
Thread connectors[] = new Thread[10];
@ -1374,8 +1374,10 @@ public class TestIPC {
}
Thread.sleep(1000);
// server should only accept up to 5 connections
assertEquals(5, server.getNumOpenConnections());
// server should only accept up to 6 connections
assertEquals(6, server.getNumOpenConnections());
// server should drop the other 4 connections
assertEquals(4, server.getNumDroppedConnections());
for (int i = 0; i < 10; i++) {
connectors[i].join();