[AMQ-8976] Add maxConnectionExceededCount metric for connectors (#850)

This commit is contained in:
Matt Pavlovich 2022-07-18 16:20:25 -05:00 committed by GitHub
parent 2b58e7b09f
commit 680717cb95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 135 additions and 13 deletions

View File

@ -29,7 +29,7 @@ import org.junit.After;
*/
public class AmqpClientTestSupport extends AmqpTestSupport {
private String connectorScheme = "amqp";
protected String connectorScheme = "amqp";
private boolean useSSL;
private List<AmqpConnection> connections = new ArrayList<AmqpConnection>();

View File

@ -68,6 +68,7 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
}
assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
assertEquals(Long.valueOf(0l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
try {
AmqpConnection connection = trackConnection(client.createConnection());
@ -78,12 +79,17 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
}
assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
assertEquals(Long.valueOf(1l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
for (AmqpConnection connection : connections) {
connection.close();
}
assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
// Confirm reset statistics
getProxyToConnectionView(getConnectorScheme()).resetStatistics();
assertEquals(Long.valueOf(0l), Long.valueOf(getProxyToConnectionView(getConnectorScheme()).getMaxConnectionExceededCount()));
}
@Override

View File

@ -36,7 +36,12 @@ public interface Connector extends Service {
* @return the statistics for this connector
*/
ConnectorStatistics getStatistics();
/**
* Reset Connector statistics
*/
void resetStatistics();
/**
* @return true if update client connections when brokers leave/join a cluster
*/
@ -46,13 +51,13 @@ public interface Connector extends Service {
* @return true if clients should be re-balanced across the cluster
*/
public boolean isRebalanceClusterClients();
/**
* Update all the connections with information
* about the connected brokers in the cluster
*/
public void updateClientClusterInfo();
/**
* @return true if clients should be updated when
* a broker is removed from a broker
@ -66,10 +71,12 @@ public interface Connector extends Service {
* @return true/false if link stealing is enabled
*/
boolean isAllowLinkStealing();
/**
* @return The comma separated string of regex patterns to match
* broker names for cluster client updates
*/
String getUpdateClusterFilter();
long getMaxConnectionExceededCount();
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.broker;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@ -101,7 +100,6 @@ import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.NetworkBridgeUtils;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

View File

@ -190,6 +190,15 @@ public class TransportConnector implements Connector, BrokerServiceAware {
return statistics;
}
/**
* Reset the statistics for this connector
*/
@Override
public void resetStatistics() {
statistics.reset();
server.resetStatistics();
}
public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
return messageAuthorizationPolicy;
}
@ -673,4 +682,9 @@ public class TransportConnector implements Connector, BrokerServiceAware {
public void setDisplayStackTrace(boolean displayStackTrace) {
this.displayStackTrace = displayStackTrace;
}
@Override
public long getMaxConnectionExceededCount() {
return (server != null ? server.getMaxConnectionExceededCount() : 0l);
}
}

View File

@ -54,7 +54,7 @@ public class ConnectorView implements ConnectorViewMBean {
*/
@Override
public void resetStatistics() {
connector.getStatistics().reset();
connector.resetStatistics();
}
/**
@ -136,4 +136,9 @@ public class ConnectorView implements ConnectorViewMBean {
public boolean isAllowLinkStealingEnabled() {
return this.connector.isAllowLinkStealing();
}
@Override
public long getMaxConnectionExceededCount() {
return this.connector.getMaxConnectionExceededCount();
}
}

View File

@ -83,5 +83,10 @@ public interface ConnectorViewMBean extends Service {
@MBeanInfo("Comma separated list of regex patterns to match broker names for cluster client updates.")
String getUpdateClusterFilter();
/**
* @return The number of occurrences the max connection count
* has been exceed
*/
@MBeanInfo("Max connection exceeded count")
long getMaxConnectionExceededCount();
}

View File

@ -155,4 +155,15 @@ public class VMTransportServer implements TransportServer {
public void setAllowLinkStealing(boolean allowLinkStealing) {
this.allowLinkStealing = allowLinkStealing;
}
@Override
public long getMaxConnectionExceededCount() {
// VM transport is not limited
return -1l;
}
@Override
public void resetStatistics() {
// VM transport does not implement statistics
}
}

View File

@ -73,4 +73,8 @@ public interface TransportServer extends Service {
* @return true if allow link stealing is enabled.
*/
boolean isAllowLinkStealing();
long getMaxConnectionExceededCount();
void resetStatistics();
}

View File

@ -32,35 +32,53 @@ public class TransportServerFilter implements TransportServer {
this.next = next;
}
@Override
public URI getConnectURI() {
return next.getConnectURI();
}
@Override
public void setAcceptListener(TransportAcceptListener acceptListener) {
next.setAcceptListener(acceptListener);
}
@Override
public void setBrokerInfo(BrokerInfo brokerInfo) {
next.setBrokerInfo(brokerInfo);
}
@Override
public void start() throws Exception {
next.start();
}
@Override
public void stop() throws Exception {
next.stop();
}
@Override
public InetSocketAddress getSocketAddress() {
return next.getSocketAddress();
}
@Override
public boolean isSslServer() {
return next.isSslServer();
}
@Override
public boolean isAllowLinkStealing() {
return next.isAllowLinkStealing();
}
@Override
public long getMaxConnectionExceededCount() {
return next.getMaxConnectionExceededCount();
}
@Override
public void resetStatistics() {
next.resetStatistics();
}
}

View File

@ -38,6 +38,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLParameters;
@ -123,6 +124,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
* The maximum number of sockets allowed for this server
*/
protected int maximumConnections = Integer.MAX_VALUE;
protected final AtomicLong maximumConnectionsExceededCount = new AtomicLong(0l);
protected final AtomicInteger currentTransportCount = new AtomicInteger();
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
@ -579,10 +581,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
do {
currentCount = currentTransportCount.get();
if (currentCount >= this.maximumConnections) {
throw new ExceededMaximumConnectionsException(
"Exceeded the maximum number of allowed client connections. See the '" +
"maximumConnections' property on the TCP transport configuration URI " +
"in the ActiveMQ configuration file (e.g., activemq.xml)");
this.maximumConnectionsExceededCount.incrementAndGet();
throw new ExceededMaximumConnectionsException(
"Exceeded the maximum number of allowed client connections. See the '" +
"maximumConnections' property on the TCP transport configuration URI " +
"in the ActiveMQ configuration file (e.g., activemq.xml)");
}
//Increment this value before configuring the transport
@ -726,4 +729,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
public void setAllowLinkStealing(boolean allowLinkStealing) {
this.allowLinkStealing = allowLinkStealing;
}
@Override
public long getMaxConnectionExceededCount() {
return this.maximumConnectionsExceededCount.get();
}
@Override
public void resetStatistics() {
this.maximumConnectionsExceededCount.set(0l);
}
}

View File

@ -199,4 +199,15 @@ public class UdpTransportServer extends TransportServerSupport {
public void setAllowLinkStealing(boolean allowLinkStealing) {
this.allowLinkStealing = allowLinkStealing;
}
@Override
public long getMaxConnectionExceededCount() {
// UDP transport deprecated
return -1l;
}
@Override
public void resetStatistics() {
// UDP transport deprecated
}
}

View File

@ -189,4 +189,15 @@ public class HttpTransportServer extends WebTransportServerSupport {
public boolean isSslServer() {
return false;
}
@Override
public long getMaxConnectionExceededCount() {
// Max Connection Count not supported for http
return -1l;
}
@Override
public void resetStatistics() {
// Statistics not implemented for http
}
}

View File

@ -172,4 +172,15 @@ public class WSTransportServer extends WebTransportServerSupport implements Brok
servlet.setBrokerService(brokerService);
}
}
@Override
public long getMaxConnectionExceededCount() {
// Max Connection Count not supported for ws
return -1l;
}
@Override
public void resetStatistics() {
// Statistics not implemented for ws
}
}

View File

@ -147,6 +147,8 @@ public class AutoTransportConnectionsTest {
assertEquals(maxConnections, transportServer.getMaximumConnections());
// No connections at first
assertEquals(0, connector.getConnections().size());
// No connections exceeded at first
assertEquals(Long.valueOf(0l), Long.valueOf(connector.getMaxConnectionExceededCount()));
// Release the latch to set up connections in parallel
startupLatch.countDown();
@ -162,6 +164,12 @@ public class AutoTransportConnectionsTest {
})
);
// The 10 extra connections exceeded connection count
assertEquals(Long.valueOf(10l), Long.valueOf(connector.getMaxConnectionExceededCount()));
// Confirm reset statistics
connector.resetStatistics();
assertEquals(Long.valueOf(0l), Long.valueOf(connector.getMaxConnectionExceededCount()));
}
@Test