From 3eb835a8ab86cca7de3848a003dd70c15f0d186d Mon Sep 17 00:00:00 2001 From: jbertram Date: Mon, 4 May 2015 11:15:33 -0500 Subject: [PATCH 1/3] ACTIVEMQ6-96 acceptor limit Adds a configuration property on both in-vm and Netty acceptors whereby the number of connections allowed is configurable. --- .../impl/netty/TransportConstants.java | 5 + .../core/remoting/impl/invm/InVMAcceptor.java | 16 ++++ .../remoting/impl/invm/InVMConnector.java | 19 +++- .../impl/invm/TransportConstants.java | 4 + .../remoting/impl/netty/NettyAcceptor.java | 59 ++++++++---- docs/user-manual/en/configuring-transports.md | 8 ++ .../server/ConnectionLimitTest.java | 96 +++++++++++++++++++ 7 files changed, 181 insertions(+), 26 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index b7299ede51..74e435fd98 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -191,6 +191,10 @@ public class TransportConstants public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1; + public static final String CONNECTIONS_ALLOWED = "connectionsAllowed"; + + public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L; + static { Set allowableAcceptorKeys = new HashSet(); @@ -224,6 +228,7 @@ public class TransportConstants allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL); allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID); + allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java index 1c5a53bb8c..857f5038e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java @@ -64,6 +64,8 @@ public final class InVMAcceptor implements Acceptor private ActiveMQPrincipal defaultActiveMQPrincipal; + private final long connectionsAllowed; + public InVMAcceptor(final ClusterConnection clusterConnection, final Map configuration, final BufferHandler handler, @@ -81,6 +83,10 @@ public final class InVMAcceptor implements Acceptor id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration); executorFactory = new OrderedExecutorFactory(threadPool); + + connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, + TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, + configuration); } public Map getConfiguration() @@ -93,6 +99,16 @@ public final class InVMAcceptor implements Acceptor return clusterConnection; } + public long getConnectionsAllowed() + { + return connectionsAllowed; + } + + public int getConnectionCount() + { + return connections.size(); + } + public synchronized void start() throws Exception { if (started) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java index fb1b4ad717..f9b746263a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java @@ -154,12 +154,21 @@ public class InVMConnector extends AbstractConnector return null; } - Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory() - .getExecutor()); + if (acceptor.getConnectionsAllowed() == -1 || acceptor.getConnectionCount() < acceptor.getConnectionsAllowed()) + { + Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory().getExecutor()); - acceptor.connect((String)conn.getID(), handler, this, executorFactory.getExecutor()); - - return conn; + acceptor.connect((String) conn.getID(), handler, this, executorFactory.getExecutor()); + return conn; + } + else + { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(acceptor.getConnectionsAllowed()).append(" reached. Refusing connection.")); + } + return null; + } } public synchronized void start() diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java index d01f5a8bc4..7905c9c36d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java @@ -23,6 +23,10 @@ public final class TransportConstants public static final int DEFAULT_SERVER_ID = 0; + public static final String CONNECTIONS_ALLOWED = "connectionsAllowed"; + + public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L; + private TransportConstants() { // Utility class diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 8ed06be4eb..53a8d69f3c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -168,6 +168,8 @@ public class NettyAcceptor implements Acceptor private final boolean httpUpgradeEnabled; + private final long connectionsAllowed; + public NettyAcceptor(final String name, final ClusterConnection clusterConnection, final Map configuration, @@ -288,6 +290,10 @@ public class NettyAcceptor implements Acceptor httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration); + + connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, + TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, + configuration); } public synchronized void start() throws Exception @@ -711,36 +717,47 @@ public class NettyAcceptor implements Acceptor public NettyServerConnection createConnection(final ChannelHandlerContext ctx, String protocol, boolean httpEnabled) throws Exception { - super.channelActive(ctx); - Listener connectionListener = new Listener(); - - NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver); - - connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol); - - SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); - if (sslHandler != null) + if (connectionsAllowed == -1 || connections.size() < connectionsAllowed) { - sslHandler.handshakeFuture().addListener(new GenericFutureListener>() + super.channelActive(ctx); + Listener connectionListener = new Listener(); + + NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver); + + connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol); + + SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + if (sslHandler != null) { - public void operationComplete(final io.netty.util.concurrent.Future future) throws Exception + sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { - if (future.isSuccess()) + public void operationComplete(final io.netty.util.concurrent.Future future) throws Exception { - active = true; + if (future.isSuccess()) + { + active = true; + } + else + { + future.getNow().close(); + } } - else - { - future.getNow().close(); - } - } - }); + }); + } + else + { + active = true; + } + return nc; } else { - active = true; + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(connectionsAllowed).append(" reached. Refusing connection from ").append(ctx.channel().remoteAddress())); + } + throw new Exception(); } - return nc; } } diff --git a/docs/user-manual/en/configuring-transports.md b/docs/user-manual/en/configuring-transports.md index cf046274dd..6ac2886cc1 100644 --- a/docs/user-manual/en/configuring-transports.md +++ b/docs/user-manual/en/configuring-transports.md @@ -285,6 +285,14 @@ Netty for simple TCP: connector will let the system pick up an ephemeral port. valid ports are 0 to 65535 +- `connectionsAllowed`. This is only valid for acceptors. It limits the + number of connections which the acceptor will allow. When this limit + is reached a DEBUG level message is issued to the log, and the connection + is refused. The type of client in use will determine what happens when + the connection is refused. In the case of a `core` client, it will + result in a `org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException`. + + ## Configuring Netty SSL Netty SSL is similar to the Netty TCP transport but it provides diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java new file mode 100644 index 0000000000..0832f24f25 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.server; + +import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException; +import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.tests.util.UnitTestCase; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class ConnectionLimitTest extends UnitTestCase +{ + private ActiveMQServer server; + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + + Map nettyParams = new HashMap(); + nettyParams.put(TransportConstants.CONNECTIONS_ALLOWED, 1); + + Map invmParams = new HashMap(); + invmParams.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.CONNECTIONS_ALLOWED, 1); + + Configuration configuration = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyParams)) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmParams)); + + server = addServer(ActiveMQServers.newActiveMQServer(configuration, false)); + server.start(); + } + + @Test + public void testInVMConnectionLimit() throws Exception + { + ServerLocator locator = addServerLocator(createNonHALocator(false)); + ClientSessionFactory clientSessionFactory = locator.createSessionFactory(); + + try + { + ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory(); + fail("creating a session factory here should fail"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQNotConnectedException); + } + } + + @Test + public void testNettyConnectionLimit() throws Exception + { + ServerLocator locator = addServerLocator(createNonHALocator(true)); + locator.setCallTimeout(3000); + ClientSessionFactory clientSessionFactory = locator.createSessionFactory(); + ClientSession clientSession = addClientSession(clientSessionFactory.createSession()); + ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory(); + + try + { + ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession()); + fail("creating a session here should fail"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQConnectionTimedOutException); + } + } +} \ No newline at end of file From 57d29ed119fe2bdf750911efa02b9eb2966fa260 Mon Sep 17 00:00:00 2001 From: jbertram Date: Mon, 4 May 2015 11:51:55 -0500 Subject: [PATCH 2/3] Point test configs to right directory --- artemis-cli/src/test/resources/broker-nojms.xml | 8 ++++---- artemis-cli/src/test/resources/broker.xml | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/artemis-cli/src/test/resources/broker-nojms.xml b/artemis-cli/src/test/resources/broker-nojms.xml index c4cbd62225..d25c75cf2d 100644 --- a/artemis-cli/src/test/resources/broker-nojms.xml +++ b/artemis-cli/src/test/resources/broker-nojms.xml @@ -20,15 +20,15 @@ under the License. - ${data.dir:../data}/paging + ./target/paging - ${data.dir:../data}/bindings + ./target/bindings - ${data.dir:../data}/journal + ./target/journal 10 - ${data.dir:../data}/large-messages + ./target/large-messages diff --git a/artemis-cli/src/test/resources/broker.xml b/artemis-cli/src/test/resources/broker.xml index 219c0b2752..b420ad8478 100644 --- a/artemis-cli/src/test/resources/broker.xml +++ b/artemis-cli/src/test/resources/broker.xml @@ -24,15 +24,15 @@ under the License. - ${data.dir:../data}/paging + ./target/paging - ${data.dir:../data}/bindings + ./target/bindings - ${data.dir:../data}/journal + ./target/journal 10 - ${data.dir:../data}/large-messages + ./target/large-messages From f509ce75191587458c59c7b28df2c1619de16174 Mon Sep 17 00:00:00 2001 From: jbertram Date: Mon, 4 May 2015 13:55:14 -0500 Subject: [PATCH 3/3] ACTIVEMQ6-70 broker resource limits Implements basic limits on the number of connections and number of queues a particular user can create to/on the broker. --- .../openwire/OpenWireProtocolManager.java | 4 +- .../openwire/amq/AMQServerSession.java | 2 +- .../core/protocol/stomp/StompConnection.java | 2 +- .../artemis/core/config/Configuration.java | 16 ++ .../core/config/impl/ConfigurationImpl.java | 23 +++ .../impl/FileConfigurationParser.java | 54 ++++++ .../core/persistence/QueueBindingInfo.java | 2 + .../impl/journal/JournalStorageManager.java | 41 ++++- .../core/server/ActiveMQMessageBundle.java | 6 + .../artemis/core/server/ActiveMQServer.java | 9 + .../activemq/artemis/core/server/Queue.java | 5 + .../artemis/core/server/QueueFactory.java | 1 + .../core/server/impl/ActiveMQServerImpl.java | 93 +++++++++- .../core/server/impl/LastValueQueue.java | 2 + .../server/impl/PostOfficeJournalLoader.java | 1 + .../core/server/impl/QueueFactoryImpl.java | 3 + .../artemis/core/server/impl/QueueImpl.java | 11 ++ .../core/server/impl/ServerSessionImpl.java | 10 +- .../settings/impl/ResourceLimitSettings.java | 173 ++++++++++++++++++ .../schema/artemis-configuration.xsd | 48 +++++ .../config/impl/FileConfigurationTest.java | 3 + .../impl/ScheduledDeliveryHandlerTest.java | 6 + .../ConfigurationTest-full-config.xml | 6 + docs/user-manual/en/SUMMARY.md | 1 + docs/user-manual/en/resource-limits.md | 27 +++ .../integration/client/DurableQueueTest.java | 21 +++ .../integration/client/HangConsumerTest.java | 6 +- .../client/InterruptedLargeMessageTest.java | 4 + .../jms/client/TopicCleanupTest.java | 2 +- .../integration/server/ResourceLimitTest.java | 131 +++++++++++++ .../core/server/impl/QueueConcurrentTest.java | 1 + .../unit/core/postoffice/impl/FakeQueue.java | 6 + .../unit/core/server/impl/QueueImplTest.java | 1 + .../server/impl/fakes/FakeQueueFactory.java | 2 + 34 files changed, 710 insertions(+), 13 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/ResourceLimitSettings.java create mode 100644 docs/user-manual/en/resource-limits.md create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index bf2ff294a4..5991ddce88 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -664,8 +664,10 @@ public class OpenWireProtocolManager implements ProtocolManager AMQServerSession fakeSession = new AMQServerSession(user, pass); CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; ((ActiveMQServerImpl) server).getSecurityStore().check(qName, checkType, fakeSession); + + ((ActiveMQServerImpl) server).checkQueueCreationLimit(user); } - this.server.createQueue(qName, qName, null, false, true); + this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, true); if (dest.isTemporary()) { connection.registerTempQueue(qName); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java index 61799c3eb2..6ee053f5b0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java @@ -343,7 +343,7 @@ public class AMQServerSession extends ServerSessionImpl return; } - server.createQueue(address, name, filterString, durable, temporary); + server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary); if (temporary) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 8f2c08a4df..ef269e2c77 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -262,7 +262,7 @@ public final class StompConnection implements RemotingConnection SimpleString queueName = new SimpleString(queue); try { - manager.getServer().createQueue(queueName, queueName, null, true, false, true); + manager.getServer().createQueue(queueName, queueName, SimpleString.toSimpleString(this.getLogin()), null, true, false, true); } catch (Exception e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 80298d1a9b..4accdb3db5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; /** * A Configuration is used to configure ActiveMQ servers. @@ -73,6 +74,21 @@ public interface Configuration */ Configuration setPersistenceEnabled(boolean enable); + /** + * @return usernames mapped to ResourceLimitSettings + */ + Map getResourceLimitSettings(); + + /** + * @param resourceLimitSettings usernames mapped to ResourceLimitSettings + */ + Configuration setResourceLimitSettings(Map resourceLimitSettings); + + /** + * @param resourceLimitSettings usernames mapped to ResourceLimitSettings + */ + Configuration addResourceLimitSettings(ResourceLimitSettings resourceLimitSettings); + /** * Returns the period (in milliseconds) to scan configuration files used by deployment.
* Default value is {@value org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_FILE_DEPLOYER_SCAN_PERIOD}. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index eef60282db..47ac086dbe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; public class ConfigurationImpl implements Configuration, Serializable { @@ -200,6 +201,8 @@ public class ConfigurationImpl implements Configuration, Serializable private Map addressesSettings = new HashMap(); + private Map resourceLimitSettings = new HashMap(); + private Map> securitySettings = new HashMap>(); protected List connectorServiceConfigurations = new ArrayList(); @@ -1023,6 +1026,26 @@ public class ConfigurationImpl implements Configuration, Serializable return this; } + @Override + public Map getResourceLimitSettings() + { + return resourceLimitSettings; + } + + @Override + public ConfigurationImpl setResourceLimitSettings(final Map resourceLimitSettings) + { + this.resourceLimitSettings = resourceLimitSettings; + return this; + } + + @Override + public ConfigurationImpl addResourceLimitSettings(ResourceLimitSettings resourceLimitSettings) + { + this.resourceLimitSettings.put(resourceLimitSettings.getMatch().toString(), resourceLimitSettings); + return this; + } + @Override public Map> getSecurityRoles() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 8a2969b01e..75a36da014 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser; import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser; @@ -147,6 +148,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues"; + private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections"; + + private static final String MAX_QUEUES_NODE_NAME = "max-queues"; + // Attributes ---------------------------------------------------- private boolean validateAIO = false; @@ -611,6 +616,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil parseAddressSettings(e, config); + parseResourceLimits(e, config); + parseQueues(e, config); parseSecurity(e, config); @@ -689,6 +696,25 @@ public final class FileConfigurationParser extends XMLConfigurationUtil } } + /** + * @param e + * @param config + */ + private void parseResourceLimits(final Element e, final Configuration config) + { + NodeList elements = e.getElementsByTagName("resource-limit-settings"); + + if (elements.getLength() != 0) + { + Element node = (Element) elements.item(0); + NodeList list = node.getElementsByTagName("resource-limit-setting"); + for (int i = 0; i < list.getLength(); i++) + { + config.addResourceLimitSettings(parseResourceLimitSettings(list.item(i))); + } + } + } + /** * @param node * @return @@ -903,6 +929,34 @@ public final class FileConfigurationParser extends XMLConfigurationUtil return setting; } + /** + * @param node + * @return + */ + protected ResourceLimitSettings parseResourceLimitSettings(final Node node) + { + ResourceLimitSettings resourceLimitSettings = new ResourceLimitSettings(); + + resourceLimitSettings.setMatch(SimpleString.toSimpleString(getAttributeValue(node, "match"))); + + NodeList children = node.getChildNodes(); + + for (int i = 0; i < children.getLength(); i++) + { + final Node child = children.item(i); + final String name = child.getNodeName(); + if (MAX_CONNECTIONS_NODE_NAME.equalsIgnoreCase(name)) + { + resourceLimitSettings.setMaxConnections(XMLUtil.parseInt(child)); + } + else if (MAX_QUEUES_NODE_NAME.equalsIgnoreCase(name)) + { + resourceLimitSettings.setMaxQueues(XMLUtil.parseInt(child)); + } + } + return resourceLimitSettings; + } + protected CoreQueueConfiguration parseQueueConfiguration(final Node node) { String name = getAttributeValue(node, "name"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index 6e21b3bf97..3fb080fe48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -36,4 +36,6 @@ public interface QueueBindingInfo boolean isAutoCreated(); + SimpleString getUser(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index a1a9f60290..a4911e9aff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -2027,6 +2027,7 @@ public class JournalStorageManager implements StorageManager PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, + queue.getUser(), queue.isAutoCreated()); readLock(); @@ -3045,6 +3046,8 @@ public class JournalStorageManager implements StorageManager public boolean autoCreated; + public SimpleString user; + public PersistentQueueBindingEncoding() { } @@ -3059,6 +3062,8 @@ public class JournalStorageManager implements StorageManager address + ", filterString=" + filterString + + ", user=" + + user + ", autoCreated=" + autoCreated + "]"; @@ -3067,11 +3072,13 @@ public class JournalStorageManager implements StorageManager public PersistentQueueBindingEncoding(final SimpleString name, final SimpleString address, final SimpleString filterString, + final SimpleString user, final boolean autoCreated) { this.name = name; this.address = address; this.filterString = filterString; + this.user = user; this.autoCreated = autoCreated; } @@ -3105,6 +3112,11 @@ public class JournalStorageManager implements StorageManager return name; } + public SimpleString getUser() + { + return user; + } + public boolean isAutoCreated() { return autoCreated; @@ -3115,6 +3127,24 @@ public class JournalStorageManager implements StorageManager name = buffer.readSimpleString(); address = buffer.readSimpleString(); filterString = buffer.readNullableSimpleString(); + + String metadata = buffer.readNullableSimpleString().toString(); + if (metadata != null) + { + String[] elements = metadata.split(";"); + for (String element : elements) + { + String[] keyValuePair = element.split("="); + if (keyValuePair.length == 2) + { + if (keyValuePair[0].equals("user")) + { + user = SimpleString.toSimpleString(keyValuePair[1]); + } + } + } + } + autoCreated = buffer.readBoolean(); } @@ -3123,13 +3153,22 @@ public class JournalStorageManager implements StorageManager buffer.writeSimpleString(name); buffer.writeSimpleString(address); buffer.writeNullableSimpleString(filterString); + buffer.writeNullableSimpleString(createMetadata()); buffer.writeBoolean(autoCreated); } public int getEncodeSize() { return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) + - SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN; + SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN + + SimpleString.sizeofNullableString(createMetadata()); + } + + private SimpleString createMetadata() + { + StringBuilder metadata = new StringBuilder(); + metadata.append("user=").append(user).append(";"); + return SimpleString.toSimpleString(metadata.toString()); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 4e7ee0d005..e4396c2b90 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -353,4 +353,10 @@ public interface ActiveMQMessageBundle @Message(id = 119109, value = "unsupported HA Policy Configuration {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQIllegalStateException unsupportedHAPolicyConfiguration(Object o); + + @Message(id = 119110, value = "Too many sessions for user ''{0}''. Sessions allowed: {1}.", format = Message.Format.MESSAGE_FORMAT) + ActiveMQSessionCreationException sessionLimitReached(String username, int limit); + + @Message(id = 119111, value = "Too many queues created by user ''{0}''. Queues allowed: {1}.", format = Message.Format.MESSAGE_FORMAT) + ActiveMQSessionCreationException queueLimitReached(String username, int limit); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index b804b4ce4a..bbc66b54d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -181,6 +181,7 @@ public interface ActiveMQServer extends ActiveMQComponent void createSharedQueue(final SimpleString address, final SimpleString name, final SimpleString filterString, + final SimpleString user, boolean durable) throws Exception; Queue createQueue(SimpleString address, @@ -192,6 +193,14 @@ public interface ActiveMQServer extends ActiveMQComponent Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary) throws Exception; + + Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, boolean durable, boolean temporary, boolean autoCreated) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index c840a375b5..021b346fd8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -235,4 +235,9 @@ public interface Queue extends Bindable void postAcknowledge(MessageReference ref); float getRate(); + + /** + * @return the user who created this queue + */ + SimpleString getUser(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java index 77799914d6..2b62efdbea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java @@ -35,6 +35,7 @@ public interface QueueFactory SimpleString name, Filter filter, PageSubscription pageSubscription, + SimpleString user, boolean durable, boolean temporary, boolean autoCreated); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 7eb5c13539..7058168ec0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -117,6 +117,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.core.transaction.ResourceManager; +import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl; import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; @@ -1045,6 +1046,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { securityStore.authenticate(username, password); } + + checkSessionLimit(username); + final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor()); final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, @@ -1055,6 +1059,73 @@ public class ActiveMQServerImpl implements ActiveMQServer return session; } + private void checkSessionLimit(String username) throws Exception + { + if (configuration.getResourceLimitSettings() != null && configuration.getResourceLimitSettings().containsKey(username)) + { + ResourceLimitSettings limits = configuration.getResourceLimitSettings().get(username); + + if (limits.getMaxConnections() == -1) + { + return; + } + else if (limits.getMaxConnections() == 0 || getSessionCountForUser(username) >= limits.getMaxConnections()) + { + throw ActiveMQMessageBundle.BUNDLE.sessionLimitReached(username, limits.getMaxConnections()); + } + } + } + + private int getSessionCountForUser(String username) + { + int sessionCount = 0; + + for (Entry sessionEntry : sessions.entrySet()) + { + if (sessionEntry.getValue().getUsername().toString().equals(username)) + { + sessionCount++; + } + } + + return sessionCount; + } + + public void checkQueueCreationLimit(String username) throws Exception + { + if (configuration.getResourceLimitSettings() != null && configuration.getResourceLimitSettings().containsKey(username)) + { + ResourceLimitSettings limits = configuration.getResourceLimitSettings().get(username); + + if (limits.getMaxQueues() == -1) + { + return; + } + else if (limits.getMaxQueues() == 0 || getQueueCountForUser(username) >= limits.getMaxQueues()) + { + throw ActiveMQMessageBundle.BUNDLE.queueLimitReached(username, limits.getMaxConnections()); + } + } + } + + public int getQueueCountForUser(String username) throws Exception + { + Map bindings = postOffice.getAllBindings(); + + int queuesForUser = 0; + + for (Binding binding : bindings.values()) + { + if (binding instanceof LocalQueueBinding && ((LocalQueueBinding) binding).getQueue().getUser().equals(SimpleString.toSimpleString(username))) + { + queuesForUser++; + } + } + + return queuesForUser; + + } + protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, @@ -1207,17 +1278,28 @@ public class ActiveMQServerImpl implements ActiveMQServer final boolean durable, final boolean temporary) throws Exception { - return createQueue(address, queueName, filterString, durable, temporary, false, false, false); + return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false); } public Queue createQueue(final SimpleString address, final SimpleString queueName, final SimpleString filterString, + final SimpleString user, + final boolean durable, + final boolean temporary) throws Exception + { + return createQueue(address, queueName, filterString, user, durable, temporary, false, false, false); + } + + public Queue createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final SimpleString user, final boolean durable, final boolean temporary, final boolean autoCreated) throws Exception { - return createQueue(address, queueName, filterString, durable, temporary, false, false, autoCreated); + return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated); } /** @@ -1235,9 +1317,10 @@ public class ActiveMQServerImpl implements ActiveMQServer public void createSharedQueue(final SimpleString address, final SimpleString name, final SimpleString filterString, + final SimpleString user, boolean durable) throws Exception { - Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable, false); + Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false); if (!queue.getAddress().equals(address)) { @@ -1286,7 +1369,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { ActiveMQServerLogger.LOGGER.deployQueue(queueName); - return createQueue(address, queueName, filterString, durable, temporary, true, false, false); + return createQueue(address, queueName, filterString, null, durable, temporary, true, false, false); } public void destroyQueue(final SimpleString queueName) throws Exception @@ -1960,6 +2043,7 @@ public class ActiveMQServerImpl implements ActiveMQServer private Queue createQueue(final SimpleString address, final SimpleString queueName, final SimpleString filterString, + final SimpleString user, final boolean durable, final boolean temporary, final boolean ignoreIfExists, @@ -2003,6 +2087,7 @@ public class ActiveMQServerImpl implements ActiveMQServer queueName, filter, pageSubscription, + user, durable, temporary, autoCreated); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 622cdd1d05..da96fc72e4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -51,6 +51,7 @@ public class LastValueQueue extends QueueImpl final SimpleString name, final Filter filter, final PageSubscription pageSubscription, + final SimpleString user, final boolean durable, final boolean temporary, final boolean autoCreated, @@ -65,6 +66,7 @@ public class LastValueQueue extends QueueImpl name, filter, pageSubscription, + user, durable, temporary, autoCreated, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 88df7a18cb..93f2a3f0f4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -155,6 +155,7 @@ public class PostOfficeJournalLoader implements JournalLoader queueBindingInfo.getQueueName(), filter, subscription, + queueBindingInfo.getUser(), true, false, queueBindingInfo.isAutoCreated()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index ad8bba2a29..afc0ac1852 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -70,6 +70,7 @@ public class QueueFactoryImpl implements QueueFactory final SimpleString name, final Filter filter, final PageSubscription pageSubscription, + final SimpleString user, final boolean durable, final boolean temporary, final boolean autoCreated) @@ -84,6 +85,7 @@ public class QueueFactoryImpl implements QueueFactory name, filter, pageSubscription, + user, durable, temporary, autoCreated, @@ -100,6 +102,7 @@ public class QueueFactoryImpl implements QueueFactory name, filter, pageSubscription, + user, durable, temporary, autoCreated, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 7c007f8851..226cf76287 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -119,6 +119,8 @@ public class QueueImpl implements Queue private final SimpleString name; + private final SimpleString user; + private volatile Filter filter; private final boolean durable; @@ -309,6 +311,7 @@ public class QueueImpl implements Queue final SimpleString address, final SimpleString name, final Filter filter, + final SimpleString user, final boolean durable, final boolean temporary, final boolean autoCreated, @@ -323,6 +326,7 @@ public class QueueImpl implements Queue name, filter, null, + user, durable, temporary, autoCreated, @@ -338,6 +342,7 @@ public class QueueImpl implements Queue final SimpleString name, final Filter filter, final PageSubscription pageSubscription, + final SimpleString user, final boolean durable, final boolean temporary, final boolean autoCreated, @@ -395,6 +400,7 @@ public class QueueImpl implements Queue this.executor = executor; + this.user = user; } // Bindable implementation ------------------------------------------------------------------------------------- @@ -409,6 +415,11 @@ public class QueueImpl implements Queue return name; } + public SimpleString getUser() + { + return user; + } + public boolean isExclusive() { return false; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 2d07ec87db..a6cd5cf85c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -559,14 +559,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this); } + ((ActiveMQServerImpl)server).checkQueueCreationLimit(getUsername()); + // any non-temporary JMS queue created via this method should be marked as auto-created if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) { - server.createQueue(address, name, filterString, durable, temporary, true); + server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true); } else { - server.createQueue(address, name, filterString, durable, temporary); + server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary); } if (temporary) @@ -602,7 +604,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this); - server.createSharedQueue(address, name, filterString, durable); + ((ActiveMQServerImpl)server).checkQueueCreationLimit(getUsername()); + + server.createSharedQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable); } public RemotingConnection getRemotingConnection() diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/ResourceLimitSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/ResourceLimitSettings.java new file mode 100644 index 0000000000..685858e3c1 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/ResourceLimitSettings.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.settings.impl; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.utils.BufferHelper; + +import java.io.Serializable; + +public class ResourceLimitSettings implements Serializable, EncodingSupport +{ + private static final long serialVersionUID = -110638321333856932L; + + public static final SimpleString DEFAULT_MATCH = null; + + public static final Integer DEFAULT_MAX_CONNECTIONS = -1; + + public static final Integer DEFAULT_MAX_QUEUES = -1; + +// public static final Long DEFAULT_MAX_QUEUE_SIZE_BYTES = -1L; + +// public static final SimpleString DEFAULT_QUEUE_NAME_REGEX = new SimpleString(".+"); + + SimpleString match = null; + + Integer maxConnections = null; + + Integer maxQueues = null; + +// Long maxQueueSizeBytes = null; + +// SimpleString queueNameRegex = null; + + public SimpleString getMatch() + { + return match != null ? match : DEFAULT_MATCH; + } + + public int getMaxConnections() + { + return maxConnections != null ? maxConnections : DEFAULT_MAX_CONNECTIONS; + } + + public int getMaxQueues() + { + return maxQueues != null ? maxQueues : DEFAULT_MAX_QUEUES; + } + +// public long getMaxQueueSizeBytes() +// { +// return maxQueueSizeBytes != null ? maxQueueSizeBytes : DEFAULT_MAX_QUEUE_SIZE_BYTES; +// } +// +// public SimpleString getQueueNameRegex() +// { +// return queueNameRegex != null ? queueNameRegex : DEFAULT_QUEUE_NAME_REGEX; +// } + + public void setMatch(SimpleString match) + { + this.match = match; + } + + public void setMaxConnections(int maxConnections) + { + this.maxConnections = maxConnections; + } + + public void setMaxQueues(int maxQueues) + { + this.maxQueues = maxQueues; + } + +// public void setMaxQueueSizeBytes(long maxQueueSizeBytes) +// { +// this.maxQueueSizeBytes = maxQueueSizeBytes; +// } +// +// public void setQueueNameRegex(SimpleString queueNameRegex) +// { +// this.queueNameRegex = queueNameRegex; +// } + + @Override + public int getEncodeSize() + { + return SimpleString.sizeofNullableString(match) + + BufferHelper.sizeOfNullableInteger(maxConnections) + + BufferHelper.sizeOfNullableInteger(maxQueues); +// BufferHelper.sizeOfNullableLong(maxQueueSizeBytes) + +// SimpleString.sizeofNullableString(queueNameRegex); + } + + @Override + public void encode(ActiveMQBuffer buffer) + { + buffer.writeNullableSimpleString(match); + + BufferHelper.writeNullableInteger(buffer, maxConnections); + + BufferHelper.writeNullableInteger(buffer, maxQueues); + +// BufferHelper.writeNullableLong(buffer, maxQueueSizeBytes); + +// buffer.writeNullableSimpleString(queueNameRegex); + } + + @Override + public void decode(ActiveMQBuffer buffer) + { + match = buffer.readNullableSimpleString(); + + maxConnections = BufferHelper.readNullableInteger(buffer); + + maxQueues = BufferHelper.readNullableInteger(buffer); + +// maxQueueSizeBytes = BufferHelper.readNullableLong(buffer); + +// queueNameRegex = buffer.readNullableSimpleString(); + } + + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((match == null) ? 0 : match.hashCode()); + result = prime * result + ((maxConnections == null) ? 0 : maxConnections.hashCode()); + result = prime * result + ((maxQueues == null) ? 0 : maxQueues.hashCode()); +// result = prime * result + ((maxQueueSizeBytes == null) ? 0 : maxQueueSizeBytes.hashCode()); +// result = prime * result + ((queueNameRegex == null) ? 0 : queueNameRegex.hashCode()); + return result; + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() + { + return "ResourceLimitSettings [match=" + match + + ", maxConnections=" + + maxConnections + + ", maxQueues=" + + maxQueues + +// ", maxQueueSizeBytes=" + +// maxQueueSizeBytes + +// ", queueNameRegex=" + +// queueNameRegex + + "]"; + } +} \ No newline at end of file diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index e9b4c130d3..4f26c763c3 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -728,6 +728,19 @@ + + + + a list of resource limit settings + + + + + + + + + @@ -2103,6 +2116,41 @@ + + + + + Complex type element to configure resource limits for a particular user. + + + + + + + how many connections are allowed by the matched entity (-1 means no limit, default is -1) + + + + + + + + how many queues can be created by the matched entity (-1 means no limit, default is -1) + + + + + + + + + the name of the user to whom the limits should be applied + + + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index b859617ff3..6f930680b0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -315,6 +315,9 @@ public class FileConfigurationTest extends ConfigurationImplTest assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues()); + assertTrue(conf.getResourceLimitSettings().containsKey("myUser")); + assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections()); + assertEquals(13, conf.getResourceLimitSettings().get("myUser").getMaxQueues()); assertEquals(2, conf.getQueueConfigurations().size()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 3486694e61..760497f3be 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1512,5 +1512,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { return 0.0f; } + + @Override + public SimpleString getUser() + { + return null; + } } } diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 06c9d95f72..75f636c91c 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -264,6 +264,12 @@ false + + + 104 + 13 + + org.foo diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md index 48724fcdb8..97e3ec3fd2 100644 --- a/docs/user-manual/en/SUMMARY.md +++ b/docs/user-manual/en/SUMMARY.md @@ -33,6 +33,7 @@ * [Extra Acknowledge Modes](pre-acknowledge.md) * [Management](management.md) * [Security](security.md) +* [Resource Limits](resource-limits.md) * [The JMS Bridge](jms-bridge.md) * [Client Reconnection and Session Reattachment](client-reconnection.md) * [Diverting and Splitting Message Flows](diverts.md) diff --git a/docs/user-manual/en/resource-limits.md b/docs/user-manual/en/resource-limits.md new file mode 100644 index 0000000000..443838f90c --- /dev/null +++ b/docs/user-manual/en/resource-limits.md @@ -0,0 +1,27 @@ +# Resource Limits + +Sometimes it's helpful to set particular limits on what certain users can +do beyond the normal security settings related to authorization and +authentication. For example, limiting how many connections a user can create +or how many queues a user can create. This chapter will explain how to +configure such limits. + +## Configuring Limits Via Resource Limit Settings + +Here is an example of the XML used to set resource limits: + + + + 5 + 3 + + + +Unlike the `match` from `address-setting`, this `match` does not use +any wild-card syntax. It's a simple 1:1 mapping of the limits to a user. + +`max-connections` defines how many connections the matched user can make +to the broker. The default is -1 which means there is no limit. + +`max-queues` defines how many queues the matched user can create. The default +is -1 which means there is no limit. \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DurableQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DurableQueueTest.java index 343ecd2b5a..4d571fc315 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DurableQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DurableQueueTest.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.activemq.artemis.tests.integration.client; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.junit.Before; import org.junit.Test; @@ -103,6 +104,26 @@ public class DurableQueueTest extends ServiceTestBase session.deleteQueue(queue); } + @Test + public void testUserEncoding() throws Exception + { + final String userName = "myUser"; + session.close(); + session = sf.createSession(userName, "myPass", false, true, true, false, 0); + + SimpleString queue = RandomUtil.randomSimpleString(); + SimpleString address = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, true); + + session.close(); + + server.stop(); + server.start(); + + assertEquals(1, ((ActiveMQServerImpl) server).getQueueCountForUser(userName)); + } + @Test public void testProduceAndConsumeFromDurableQueueAfterServerRestart() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 98b6edc302..0a47c3a48c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -237,6 +237,7 @@ public class HangConsumerTest extends ServiceTestBase final SimpleString address, final SimpleString name, final Filter filter, + final SimpleString user, final PageSubscription pageSubscription, final boolean durable, final boolean temporary, @@ -252,6 +253,7 @@ public class HangConsumerTest extends ServiceTestBase name, filter, pageSubscription, + user, durable, temporary, autoCreated, @@ -293,6 +295,7 @@ public class HangConsumerTest extends ServiceTestBase final SimpleString name, final Filter filter, final PageSubscription pageSubscription, + final SimpleString user, final boolean durable, final boolean temporary, final boolean autoCreated) @@ -301,6 +304,7 @@ public class HangConsumerTest extends ServiceTestBase address, name, filter, + user, pageSubscription, durable, temporary, @@ -403,7 +407,7 @@ public class HangConsumerTest extends ServiceTestBase // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally - LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, false, null, null, null, null, null), server.getNodeID()); + LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID()); server.getStorageManager().addQueueBinding(txID, newBinding); server.getStorageManager().commitBindings(txID); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index 4e3d51923e..22b30a4870 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -498,6 +498,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase SimpleString address, SimpleString name, Filter filter, + SimpleString user, PageSubscription pageSubscription, boolean durable, boolean temporary, @@ -513,6 +514,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase name, filter, pageSubscription, + user, durable, temporary, autoCreated, @@ -566,6 +568,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase SimpleString name, Filter filter, PageSubscription pageSubscription, + SimpleString user, boolean durable, boolean temporary, boolean autoCreated) @@ -575,6 +578,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase address, name, filter, + user, pageSubscription, durable, temporary, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java index a6ab6afa05..8c4ed26f6e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java @@ -80,7 +80,7 @@ public class TopicCleanupTest extends JMSTestBase { long txid = storage.generateID(); - final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, false, server.getScheduledPool(), server.getPostOffice(), + final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor()); LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java new file mode 100644 index 0000000000..08d193c99c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.server; + +import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; +import org.apache.activemq.artemis.tests.util.UnitTestCase; +import org.junit.Before; +import org.junit.Test; + +public class ResourceLimitTest extends UnitTestCase +{ + private ActiveMQServer server; + + private TransportConfiguration liveTC; + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + + ResourceLimitSettings resourceLimitSettings = new ResourceLimitSettings(); + resourceLimitSettings.setMatch(SimpleString.toSimpleString("myUser")); + resourceLimitSettings.setMaxConnections(1); + resourceLimitSettings.setMaxQueues(1); + + Configuration configuration = createBasicConfig() + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)) + .addResourceLimitSettings(resourceLimitSettings); + + server = addServer(ActiveMQServers.newActiveMQServer(configuration, false)); + server.start(); + } + + @Test + public void testSessionLimitForUser() throws Exception + { + ServerLocator locator = addServerLocator(createNonHALocator(false)); + ClientSessionFactory clientSessionFactory = locator.createSessionFactory(); + ClientSession clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0); + + try + { + ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory(); + ClientSession extraClientSession = extraClientSessionFactory.createSession("myUser", "password", false, true, true, false, 0); + fail("creating a session factory here should fail"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + } + + clientSession.close(); + + clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0); + + try + { + ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory(); + ClientSession extraClientSession = extraClientSessionFactory.createSession("myUser", "password", false, true, true, false, 0); + fail("creating a session factory here should fail"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + } + } + + @Test + public void testQueueLimitForUser() throws Exception + { + ServerLocator locator = addServerLocator(createNonHALocator(false)); + ClientSessionFactory clientSessionFactory = locator.createSessionFactory(); + ClientSession clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0); + clientSession.createQueue("address", "queue"); + + try + { + clientSession.createQueue("address", "anotherQueue"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + } + + clientSession.deleteQueue("queue"); + + clientSession.createQueue("address", "queue"); + + try + { + clientSession.createQueue("address", "anotherQueue"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + } + + try + { + clientSession.createSharedQueue(SimpleString.toSimpleString("address"), SimpleString.toSimpleString("anotherQueue"), false); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + } + } +} \ No newline at end of file diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java index 8f71002319..b57903ca0b 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java @@ -73,6 +73,7 @@ public class QueueConcurrentTest extends UnitTestCase new SimpleString("queue1"), null, null, + null, false, false, false); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index da48d7770b..2bb977087a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -622,4 +622,10 @@ public class FakeQueue implements Queue { return 0.0f; } + + @Override + public SimpleString getUser() + { + return null; + } } \ No newline at end of file diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 5dc4cff664..add2da0e52 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -1410,6 +1410,7 @@ public class QueueImplTest extends UnitTestCase QueueImplTest.address1, name, filter, + null, durable, temporary, false, diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java index 145f4b279d..c08a4aac91 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java @@ -41,6 +41,7 @@ public class FakeQueueFactory implements QueueFactory final SimpleString name, final Filter filter, final PageSubscription subscription, + final SimpleString user, final boolean durable, final boolean temporary, final boolean autoCreated) @@ -50,6 +51,7 @@ public class FakeQueueFactory implements QueueFactory name, filter, subscription, + user, durable, temporary, autoCreated,