HADOOP-14440. Add metrics for connections dropped. Contributed by Eric Badger.
(cherry picked from commit abdd609e51
)
This commit is contained in:
parent
5410ec995d
commit
9854682ea4
|
@ -63,6 +63,7 @@ import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import javax.security.sasl.Sasl;
|
import javax.security.sasl.Sasl;
|
||||||
import javax.security.sasl.SaslException;
|
import javax.security.sasl.SaslException;
|
||||||
|
@ -1106,6 +1107,7 @@ public abstract class Server {
|
||||||
if (channel.isOpen()) {
|
if (channel.isOpen()) {
|
||||||
IOUtils.cleanup(null, channel);
|
IOUtils.cleanup(null, channel);
|
||||||
}
|
}
|
||||||
|
connectionManager.droppedConnections.getAndIncrement();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
key.attach(c); // so closeCurrentConnection can get the object
|
key.attach(c); // so closeCurrentConnection can get the object
|
||||||
|
@ -2972,6 +2974,16 @@ public abstract class Server {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of RPC connections dropped due to
|
||||||
|
* too many connections.
|
||||||
|
* @return the number of dropped rpc connections
|
||||||
|
*/
|
||||||
|
public long getNumDroppedConnections() {
|
||||||
|
return connectionManager.getDroppedConnections();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The number of rpc calls in the queue.
|
* The number of rpc calls in the queue.
|
||||||
* @return The number of rpc calls in the queue.
|
* @return The number of rpc calls in the queue.
|
||||||
|
@ -3082,6 +3094,7 @@ public abstract class Server {
|
||||||
|
|
||||||
private class ConnectionManager {
|
private class ConnectionManager {
|
||||||
final private AtomicInteger count = new AtomicInteger();
|
final private AtomicInteger count = new AtomicInteger();
|
||||||
|
final private AtomicLong droppedConnections = new AtomicLong();
|
||||||
final private Set<Connection> connections;
|
final private Set<Connection> connections;
|
||||||
/* Map to maintain the statistics per User */
|
/* Map to maintain the statistics per User */
|
||||||
final private Map<String, Integer> userToConnectionsMap;
|
final private Map<String, Integer> userToConnectionsMap;
|
||||||
|
@ -3168,6 +3181,11 @@ public abstract class Server {
|
||||||
return userToConnectionsMap;
|
return userToConnectionsMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
long getDroppedConnections() {
|
||||||
|
return droppedConnections.get();
|
||||||
|
}
|
||||||
|
|
||||||
int size() {
|
int size() {
|
||||||
return count.get();
|
return count.get();
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,6 +113,10 @@ public class RpcMetrics {
|
||||||
return server.getCallQueueLen();
|
return server.getCallQueueLen();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Metric("Number of dropped connections") public long numDroppedConnections() {
|
||||||
|
return server.getNumDroppedConnections();
|
||||||
|
}
|
||||||
|
|
||||||
// Public instrumentation methods that could be extracted to an
|
// Public instrumentation methods that could be extracted to an
|
||||||
// abstract class if we decide to do custom instrumentation classes a la
|
// abstract class if we decide to do custom instrumentation classes a la
|
||||||
// JobTrackerInstrumentation. The methods with //@Override comment are
|
// JobTrackerInstrumentation. The methods with //@Override comment are
|
||||||
|
|
|
@ -79,6 +79,7 @@ Each metrics record contains tags such as Hostname and port (number to which ser
|
||||||
| `RpcAuthorizationSuccesses` | Total number of authorization successes |
|
| `RpcAuthorizationSuccesses` | Total number of authorization successes |
|
||||||
| `NumOpenConnections` | Current number of open connections |
|
| `NumOpenConnections` | Current number of open connections |
|
||||||
| `CallQueueLength` | Current length of the call queue |
|
| `CallQueueLength` | Current length of the call queue |
|
||||||
|
| `numDroppedConnections` | Total number of dropped connections |
|
||||||
| `rpcQueueTime`*num*`sNumOps` | Shows total number of RPC calls (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcQueueTime`*num*`sNumOps` | Shows total number of RPC calls (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
| `rpcQueueTime`*num*`s50thPercentileLatency` | Shows the 50th percentile of RPC queue time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcQueueTime`*num*`s50thPercentileLatency` | Shows the 50th percentile of RPC queue time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
| `rpcQueueTime`*num*`s75thPercentileLatency` | Shows the 75th percentile of RPC queue time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcQueueTime`*num*`s75thPercentileLatency` | Shows the 75th percentile of RPC queue time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
|
|
@ -1339,7 +1339,7 @@ public class TestIPC {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaxConnections() throws Exception {
|
public void testMaxConnections() throws Exception {
|
||||||
conf.setInt("ipc.server.max.connections", 5);
|
conf.setInt("ipc.server.max.connections", 6);
|
||||||
Server server = null;
|
Server server = null;
|
||||||
Thread connectors[] = new Thread[10];
|
Thread connectors[] = new Thread[10];
|
||||||
|
|
||||||
|
@ -1374,8 +1374,10 @@ public class TestIPC {
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
// server should only accept up to 5 connections
|
// server should only accept up to 6 connections
|
||||||
assertEquals(5, server.getNumOpenConnections());
|
assertEquals(6, server.getNumOpenConnections());
|
||||||
|
// server should drop the other 4 connections
|
||||||
|
assertEquals(4, server.getNumDroppedConnections());
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
connectors[i].join();
|
connectors[i].join();
|
||||||
|
|
Loading…
Reference in New Issue