ACTIVEMQ6-96 acceptor limit

Adds a configuration property on both in-vm and Netty acceptors
whereby the number of connections allowed is configurable.
This commit is contained in:
jbertram 2015-05-04 11:15:33 -05:00
parent adb0b2bddb
commit 3eb835a8ab
7 changed files with 181 additions and 26 deletions

View File

@ -191,6 +191,10 @@ public class TransportConstants
public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1;
public static final String CONNECTIONS_ALLOWED = "connectionsAllowed";
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
static
{
Set<String> allowableAcceptorKeys = new HashSet<String>();
@ -224,6 +228,7 @@ public class TransportConstants
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());

View File

@ -64,6 +64,8 @@ public final class InVMAcceptor implements Acceptor
private ActiveMQPrincipal defaultActiveMQPrincipal;
private final long connectionsAllowed;
public InVMAcceptor(final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
final BufferHandler handler,
@ -81,6 +83,10 @@ public final class InVMAcceptor implements Acceptor
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
executorFactory = new OrderedExecutorFactory(threadPool);
connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED,
TransportConstants.DEFAULT_CONNECTIONS_ALLOWED,
configuration);
}
public Map<String, Object> getConfiguration()
@ -93,6 +99,16 @@ public final class InVMAcceptor implements Acceptor
return clusterConnection;
}
public long getConnectionsAllowed()
{
return connectionsAllowed;
}
public int getConnectionCount()
{
return connections.size();
}
public synchronized void start() throws Exception
{
if (started)

View File

@ -154,12 +154,21 @@ public class InVMConnector extends AbstractConnector
return null;
}
Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory()
.getExecutor());
if (acceptor.getConnectionsAllowed() == -1 || acceptor.getConnectionCount() < acceptor.getConnectionsAllowed())
{
Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory().getExecutor());
acceptor.connect((String)conn.getID(), handler, this, executorFactory.getExecutor());
return conn;
acceptor.connect((String) conn.getID(), handler, this, executorFactory.getExecutor());
return conn;
}
else
{
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(acceptor.getConnectionsAllowed()).append(" reached. Refusing connection."));
}
return null;
}
}
public synchronized void start()

View File

@ -23,6 +23,10 @@ public final class TransportConstants
public static final int DEFAULT_SERVER_ID = 0;
public static final String CONNECTIONS_ALLOWED = "connectionsAllowed";
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
private TransportConstants()
{
// Utility class

View File

@ -168,6 +168,8 @@ public class NettyAcceptor implements Acceptor
private final boolean httpUpgradeEnabled;
private final long connectionsAllowed;
public NettyAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@ -288,6 +290,10 @@ public class NettyAcceptor implements Acceptor
httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME,
TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED,
configuration);
connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED,
TransportConstants.DEFAULT_CONNECTIONS_ALLOWED,
configuration);
}
public synchronized void start() throws Exception
@ -711,36 +717,47 @@ public class NettyAcceptor implements Acceptor
public NettyServerConnection createConnection(final ChannelHandlerContext ctx, String protocol, boolean httpEnabled) throws Exception
{
super.channelActive(ctx);
Listener connectionListener = new Listener();
NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol);
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
if (sslHandler != null)
if (connectionsAllowed == -1 || connections.size() < connectionsAllowed)
{
sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>()
super.channelActive(ctx);
Listener connectionListener = new Listener();
NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol);
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
if (sslHandler != null)
{
public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception
sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>()
{
if (future.isSuccess())
public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception
{
active = true;
if (future.isSuccess())
{
active = true;
}
else
{
future.getNow().close();
}
}
else
{
future.getNow().close();
}
}
});
});
}
else
{
active = true;
}
return nc;
}
else
{
active = true;
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(connectionsAllowed).append(" reached. Refusing connection from ").append(ctx.channel().remoteAddress()));
}
throw new Exception();
}
return nc;
}
}

View File

@ -285,6 +285,14 @@ Netty for simple TCP:
connector will let the system pick up an ephemeral port. valid ports
are 0 to 65535
- `connectionsAllowed`. This is only valid for acceptors. It limits the
number of connections which the acceptor will allow. When this limit
is reached a DEBUG level message is issued to the log, and the connection
is refused. The type of client in use will determine what happens when
the connection is refused. In the case of a `core` client, it will
result in a `org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException`.
## Configuring Netty SSL
Netty SSL is similar to the Netty TCP transport but it provides

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.tests.util.UnitTestCase;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class ConnectionLimitTest extends UnitTestCase
{
private ActiveMQServer server;
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
Map nettyParams = new HashMap();
nettyParams.put(TransportConstants.CONNECTIONS_ALLOWED, 1);
Map invmParams = new HashMap();
invmParams.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.CONNECTIONS_ALLOWED, 1);
Configuration configuration = createBasicConfig()
.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyParams))
.addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmParams));
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
server.start();
}
@Test
public void testInVMConnectionLimit() throws Exception
{
ServerLocator locator = addServerLocator(createNonHALocator(false));
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
try
{
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
fail("creating a session factory here should fail");
}
catch (Exception e)
{
assertTrue(e instanceof ActiveMQNotConnectedException);
}
}
@Test
public void testNettyConnectionLimit() throws Exception
{
ServerLocator locator = addServerLocator(createNonHALocator(true));
locator.setCallTimeout(3000);
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
ClientSession clientSession = addClientSession(clientSessionFactory.createSession());
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
try
{
ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession());
fail("creating a session here should fail");
}
catch (Exception e)
{
assertTrue(e instanceof ActiveMQConnectionTimedOutException);
}
}
}