This closes #222 on resource limits
This commit is contained in:
commit
bebd8093c0
|
@ -20,15 +20,15 @@ under the License.
|
|||
|
||||
<configuration xmlns="urn:activemq">
|
||||
<core xmlns="urn:activemq:core">
|
||||
<paging-directory>${data.dir:../data}/paging</paging-directory>
|
||||
<paging-directory>./target/paging</paging-directory>
|
||||
|
||||
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
|
||||
<bindings-directory>./target/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>${data.dir:../data}/journal</journal-directory>
|
||||
<journal-directory>./target/journal</journal-directory>
|
||||
|
||||
<journal-min-files>10</journal-min-files>
|
||||
|
||||
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
|
||||
<large-messages-directory>./target/large-messages</large-messages-directory>
|
||||
|
||||
<connectors>
|
||||
<!-- Default Connector. Returned to clients during broadcast and distributed around cluster. See broadcast and discovery-groups -->
|
||||
|
|
|
@ -24,15 +24,15 @@ under the License.
|
|||
<queue name="ExpiryQueue"/>
|
||||
</jms>
|
||||
<core xmlns="urn:activemq:core">
|
||||
<paging-directory>${data.dir:../data}/paging</paging-directory>
|
||||
<paging-directory>./target/paging</paging-directory>
|
||||
|
||||
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
|
||||
<bindings-directory>./target/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>${data.dir:../data}/journal</journal-directory>
|
||||
<journal-directory>./target/journal</journal-directory>
|
||||
|
||||
<journal-min-files>10</journal-min-files>
|
||||
|
||||
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
|
||||
<large-messages-directory>./target/large-messages</large-messages-directory>
|
||||
|
||||
<connectors>
|
||||
<!-- Default Connector. Returned to clients during broadcast and distributed around cluster. See broadcast and discovery-groups -->
|
||||
|
|
|
@ -191,6 +191,10 @@ public class TransportConstants
|
|||
|
||||
public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1;
|
||||
|
||||
public static final String CONNECTIONS_ALLOWED = "connectionsAllowed";
|
||||
|
||||
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
|
||||
|
||||
static
|
||||
{
|
||||
Set<String> allowableAcceptorKeys = new HashSet<String>();
|
||||
|
@ -224,6 +228,7 @@ public class TransportConstants
|
|||
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
|
||||
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
|
||||
allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
|
||||
allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
|
||||
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
|
||||
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
|
||||
|
||||
|
|
|
@ -664,8 +664,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
|
|||
AMQServerSession fakeSession = new AMQServerSession(user, pass);
|
||||
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
|
||||
((ActiveMQServerImpl) server).getSecurityStore().check(qName, checkType, fakeSession);
|
||||
|
||||
((ActiveMQServerImpl) server).checkQueueCreationLimit(user);
|
||||
}
|
||||
this.server.createQueue(qName, qName, null, false, true);
|
||||
this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, true);
|
||||
if (dest.isTemporary())
|
||||
{
|
||||
connection.registerTempQueue(qName);
|
||||
|
|
|
@ -343,7 +343,7 @@ public class AMQServerSession extends ServerSessionImpl
|
|||
return;
|
||||
}
|
||||
|
||||
server.createQueue(address, name, filterString, durable, temporary);
|
||||
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
|
||||
|
||||
if (temporary)
|
||||
{
|
||||
|
|
|
@ -262,7 +262,7 @@ public final class StompConnection implements RemotingConnection
|
|||
SimpleString queueName = new SimpleString(queue);
|
||||
try
|
||||
{
|
||||
manager.getServer().createQueue(queueName, queueName, null, true, false, true);
|
||||
manager.getServer().createQueue(queueName, queueName, SimpleString.toSimpleString(this.getLogin()), null, true, false, true);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.security.Role;
|
|||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
|
||||
/**
|
||||
* A Configuration is used to configure ActiveMQ servers.
|
||||
|
@ -73,6 +74,21 @@ public interface Configuration
|
|||
*/
|
||||
Configuration setPersistenceEnabled(boolean enable);
|
||||
|
||||
/**
|
||||
* @return usernames mapped to ResourceLimitSettings
|
||||
*/
|
||||
Map<String, ResourceLimitSettings> getResourceLimitSettings();
|
||||
|
||||
/**
|
||||
* @param resourceLimitSettings usernames mapped to ResourceLimitSettings
|
||||
*/
|
||||
Configuration setResourceLimitSettings(Map<String, ResourceLimitSettings> resourceLimitSettings);
|
||||
|
||||
/**
|
||||
* @param resourceLimitSettings usernames mapped to ResourceLimitSettings
|
||||
*/
|
||||
Configuration addResourceLimitSettings(ResourceLimitSettings resourceLimitSettings);
|
||||
|
||||
/**
|
||||
* Returns the period (in milliseconds) to scan configuration files used by deployment. <br>
|
||||
* Default value is {@value org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_FILE_DEPLOYER_SCAN_PERIOD}.
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.security.Role;
|
|||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
|
||||
public class ConfigurationImpl implements Configuration, Serializable
|
||||
{
|
||||
|
@ -200,6 +201,8 @@ public class ConfigurationImpl implements Configuration, Serializable
|
|||
|
||||
private Map<String, AddressSettings> addressesSettings = new HashMap<String, AddressSettings>();
|
||||
|
||||
private Map<String, ResourceLimitSettings> resourceLimitSettings = new HashMap<String, ResourceLimitSettings>();
|
||||
|
||||
private Map<String, Set<Role>> securitySettings = new HashMap<String, Set<Role>>();
|
||||
|
||||
protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<ConnectorServiceConfiguration>();
|
||||
|
@ -1023,6 +1026,26 @@ public class ConfigurationImpl implements Configuration, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ResourceLimitSettings> getResourceLimitSettings()
|
||||
{
|
||||
return resourceLimitSettings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationImpl setResourceLimitSettings(final Map<String, ResourceLimitSettings> resourceLimitSettings)
|
||||
{
|
||||
this.resourceLimitSettings = resourceLimitSettings;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationImpl addResourceLimitSettings(ResourceLimitSettings resourceLimitSettings)
|
||||
{
|
||||
this.resourceLimitSettings.put(resourceLimitSettings.getMatch().toString(), resourceLimitSettings);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Set<Role>> getSecurityRoles()
|
||||
{
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
|
|||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
|
||||
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
|
||||
|
@ -147,6 +148,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
|||
|
||||
private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues";
|
||||
|
||||
private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections";
|
||||
|
||||
private static final String MAX_QUEUES_NODE_NAME = "max-queues";
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
private boolean validateAIO = false;
|
||||
|
@ -611,6 +616,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
|||
|
||||
parseAddressSettings(e, config);
|
||||
|
||||
parseResourceLimits(e, config);
|
||||
|
||||
parseQueues(e, config);
|
||||
|
||||
parseSecurity(e, config);
|
||||
|
@ -689,6 +696,25 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param e
|
||||
* @param config
|
||||
*/
|
||||
private void parseResourceLimits(final Element e, final Configuration config)
|
||||
{
|
||||
NodeList elements = e.getElementsByTagName("resource-limit-settings");
|
||||
|
||||
if (elements.getLength() != 0)
|
||||
{
|
||||
Element node = (Element) elements.item(0);
|
||||
NodeList list = node.getElementsByTagName("resource-limit-setting");
|
||||
for (int i = 0; i < list.getLength(); i++)
|
||||
{
|
||||
config.addResourceLimitSettings(parseResourceLimitSettings(list.item(i)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param node
|
||||
* @return
|
||||
|
@ -903,6 +929,34 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
|||
return setting;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param node
|
||||
* @return
|
||||
*/
|
||||
protected ResourceLimitSettings parseResourceLimitSettings(final Node node)
|
||||
{
|
||||
ResourceLimitSettings resourceLimitSettings = new ResourceLimitSettings();
|
||||
|
||||
resourceLimitSettings.setMatch(SimpleString.toSimpleString(getAttributeValue(node, "match")));
|
||||
|
||||
NodeList children = node.getChildNodes();
|
||||
|
||||
for (int i = 0; i < children.getLength(); i++)
|
||||
{
|
||||
final Node child = children.item(i);
|
||||
final String name = child.getNodeName();
|
||||
if (MAX_CONNECTIONS_NODE_NAME.equalsIgnoreCase(name))
|
||||
{
|
||||
resourceLimitSettings.setMaxConnections(XMLUtil.parseInt(child));
|
||||
}
|
||||
else if (MAX_QUEUES_NODE_NAME.equalsIgnoreCase(name))
|
||||
{
|
||||
resourceLimitSettings.setMaxQueues(XMLUtil.parseInt(child));
|
||||
}
|
||||
}
|
||||
return resourceLimitSettings;
|
||||
}
|
||||
|
||||
protected CoreQueueConfiguration parseQueueConfiguration(final Node node)
|
||||
{
|
||||
String name = getAttributeValue(node, "name");
|
||||
|
|
|
@ -36,4 +36,6 @@ public interface QueueBindingInfo
|
|||
|
||||
boolean isAutoCreated();
|
||||
|
||||
SimpleString getUser();
|
||||
|
||||
}
|
||||
|
|
|
@ -2027,6 +2027,7 @@ public class JournalStorageManager implements StorageManager
|
|||
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
|
||||
binding.getAddress(),
|
||||
filterString,
|
||||
queue.getUser(),
|
||||
queue.isAutoCreated());
|
||||
|
||||
readLock();
|
||||
|
@ -3045,6 +3046,8 @@ public class JournalStorageManager implements StorageManager
|
|||
|
||||
public boolean autoCreated;
|
||||
|
||||
public SimpleString user;
|
||||
|
||||
public PersistentQueueBindingEncoding()
|
||||
{
|
||||
}
|
||||
|
@ -3059,6 +3062,8 @@ public class JournalStorageManager implements StorageManager
|
|||
address +
|
||||
", filterString=" +
|
||||
filterString +
|
||||
", user=" +
|
||||
user +
|
||||
", autoCreated=" +
|
||||
autoCreated +
|
||||
"]";
|
||||
|
@ -3067,11 +3072,13 @@ public class JournalStorageManager implements StorageManager
|
|||
public PersistentQueueBindingEncoding(final SimpleString name,
|
||||
final SimpleString address,
|
||||
final SimpleString filterString,
|
||||
final SimpleString user,
|
||||
final boolean autoCreated)
|
||||
{
|
||||
this.name = name;
|
||||
this.address = address;
|
||||
this.filterString = filterString;
|
||||
this.user = user;
|
||||
this.autoCreated = autoCreated;
|
||||
}
|
||||
|
||||
|
@ -3105,6 +3112,11 @@ public class JournalStorageManager implements StorageManager
|
|||
return name;
|
||||
}
|
||||
|
||||
public SimpleString getUser()
|
||||
{
|
||||
return user;
|
||||
}
|
||||
|
||||
public boolean isAutoCreated()
|
||||
{
|
||||
return autoCreated;
|
||||
|
@ -3115,6 +3127,24 @@ public class JournalStorageManager implements StorageManager
|
|||
name = buffer.readSimpleString();
|
||||
address = buffer.readSimpleString();
|
||||
filterString = buffer.readNullableSimpleString();
|
||||
|
||||
String metadata = buffer.readNullableSimpleString().toString();
|
||||
if (metadata != null)
|
||||
{
|
||||
String[] elements = metadata.split(";");
|
||||
for (String element : elements)
|
||||
{
|
||||
String[] keyValuePair = element.split("=");
|
||||
if (keyValuePair.length == 2)
|
||||
{
|
||||
if (keyValuePair[0].equals("user"))
|
||||
{
|
||||
user = SimpleString.toSimpleString(keyValuePair[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
autoCreated = buffer.readBoolean();
|
||||
}
|
||||
|
||||
|
@ -3123,13 +3153,22 @@ public class JournalStorageManager implements StorageManager
|
|||
buffer.writeSimpleString(name);
|
||||
buffer.writeSimpleString(address);
|
||||
buffer.writeNullableSimpleString(filterString);
|
||||
buffer.writeNullableSimpleString(createMetadata());
|
||||
buffer.writeBoolean(autoCreated);
|
||||
}
|
||||
|
||||
public int getEncodeSize()
|
||||
{
|
||||
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
|
||||
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN;
|
||||
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
|
||||
SimpleString.sizeofNullableString(createMetadata());
|
||||
}
|
||||
|
||||
private SimpleString createMetadata()
|
||||
{
|
||||
StringBuilder metadata = new StringBuilder();
|
||||
metadata.append("user=").append(user).append(";");
|
||||
return SimpleString.toSimpleString(metadata.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -64,6 +64,8 @@ public final class InVMAcceptor implements Acceptor
|
|||
|
||||
private ActiveMQPrincipal defaultActiveMQPrincipal;
|
||||
|
||||
private final long connectionsAllowed;
|
||||
|
||||
public InVMAcceptor(final ClusterConnection clusterConnection,
|
||||
final Map<String, Object> configuration,
|
||||
final BufferHandler handler,
|
||||
|
@ -81,6 +83,10 @@ public final class InVMAcceptor implements Acceptor
|
|||
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
|
||||
|
||||
executorFactory = new OrderedExecutorFactory(threadPool);
|
||||
|
||||
connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED,
|
||||
TransportConstants.DEFAULT_CONNECTIONS_ALLOWED,
|
||||
configuration);
|
||||
}
|
||||
|
||||
public Map<String, Object> getConfiguration()
|
||||
|
@ -93,6 +99,16 @@ public final class InVMAcceptor implements Acceptor
|
|||
return clusterConnection;
|
||||
}
|
||||
|
||||
public long getConnectionsAllowed()
|
||||
{
|
||||
return connectionsAllowed;
|
||||
}
|
||||
|
||||
public int getConnectionCount()
|
||||
{
|
||||
return connections.size();
|
||||
}
|
||||
|
||||
public synchronized void start() throws Exception
|
||||
{
|
||||
if (started)
|
||||
|
|
|
@ -154,12 +154,21 @@ public class InVMConnector extends AbstractConnector
|
|||
return null;
|
||||
}
|
||||
|
||||
Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory()
|
||||
.getExecutor());
|
||||
if (acceptor.getConnectionsAllowed() == -1 || acceptor.getConnectionCount() < acceptor.getConnectionsAllowed())
|
||||
{
|
||||
Connection conn = internalCreateConnection(acceptor.getHandler(), new Listener(), acceptor.getExecutorFactory().getExecutor());
|
||||
|
||||
acceptor.connect((String)conn.getID(), handler, this, executorFactory.getExecutor());
|
||||
|
||||
return conn;
|
||||
acceptor.connect((String) conn.getID(), handler, this, executorFactory.getExecutor());
|
||||
return conn;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(acceptor.getConnectionsAllowed()).append(" reached. Refusing connection."));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void start()
|
||||
|
|
|
@ -23,6 +23,10 @@ public final class TransportConstants
|
|||
|
||||
public static final int DEFAULT_SERVER_ID = 0;
|
||||
|
||||
public static final String CONNECTIONS_ALLOWED = "connectionsAllowed";
|
||||
|
||||
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
|
||||
|
||||
private TransportConstants()
|
||||
{
|
||||
// Utility class
|
||||
|
|
|
@ -168,6 +168,8 @@ public class NettyAcceptor implements Acceptor
|
|||
|
||||
private final boolean httpUpgradeEnabled;
|
||||
|
||||
private final long connectionsAllowed;
|
||||
|
||||
public NettyAcceptor(final String name,
|
||||
final ClusterConnection clusterConnection,
|
||||
final Map<String, Object> configuration,
|
||||
|
@ -288,6 +290,10 @@ public class NettyAcceptor implements Acceptor
|
|||
httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME,
|
||||
TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED,
|
||||
configuration);
|
||||
|
||||
connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED,
|
||||
TransportConstants.DEFAULT_CONNECTIONS_ALLOWED,
|
||||
configuration);
|
||||
}
|
||||
|
||||
public synchronized void start() throws Exception
|
||||
|
@ -711,36 +717,47 @@ public class NettyAcceptor implements Acceptor
|
|||
|
||||
public NettyServerConnection createConnection(final ChannelHandlerContext ctx, String protocol, boolean httpEnabled) throws Exception
|
||||
{
|
||||
super.channelActive(ctx);
|
||||
Listener connectionListener = new Listener();
|
||||
|
||||
NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
|
||||
|
||||
connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol);
|
||||
|
||||
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
|
||||
if (sslHandler != null)
|
||||
if (connectionsAllowed == -1 || connections.size() < connectionsAllowed)
|
||||
{
|
||||
sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>()
|
||||
super.channelActive(ctx);
|
||||
Listener connectionListener = new Listener();
|
||||
|
||||
NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
|
||||
|
||||
connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol);
|
||||
|
||||
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
|
||||
if (sslHandler != null)
|
||||
{
|
||||
public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception
|
||||
sslHandler.handshakeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Channel>>()
|
||||
{
|
||||
if (future.isSuccess())
|
||||
public void operationComplete(final io.netty.util.concurrent.Future<Channel> future) throws Exception
|
||||
{
|
||||
active = true;
|
||||
if (future.isSuccess())
|
||||
{
|
||||
active = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
future.getNow().close();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
future.getNow().close();
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
active = true;
|
||||
}
|
||||
return nc;
|
||||
}
|
||||
else
|
||||
{
|
||||
active = true;
|
||||
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
|
||||
{
|
||||
ActiveMQServerLogger.LOGGER.debug(new StringBuilder().append("Connection limit of ").append(connectionsAllowed).append(" reached. Refusing connection from ").append(ctx.channel().remoteAddress()));
|
||||
}
|
||||
throw new Exception();
|
||||
}
|
||||
return nc;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -353,4 +353,10 @@ public interface ActiveMQMessageBundle
|
|||
|
||||
@Message(id = 119109, value = "unsupported HA Policy Configuration {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
ActiveMQIllegalStateException unsupportedHAPolicyConfiguration(Object o);
|
||||
|
||||
@Message(id = 119110, value = "Too many sessions for user ''{0}''. Sessions allowed: {1}.", format = Message.Format.MESSAGE_FORMAT)
|
||||
ActiveMQSessionCreationException sessionLimitReached(String username, int limit);
|
||||
|
||||
@Message(id = 119111, value = "Too many queues created by user ''{0}''. Queues allowed: {1}.", format = Message.Format.MESSAGE_FORMAT)
|
||||
ActiveMQSessionCreationException queueLimitReached(String username, int limit);
|
||||
}
|
||||
|
|
|
@ -181,6 +181,7 @@ public interface ActiveMQServer extends ActiveMQComponent
|
|||
void createSharedQueue(final SimpleString address,
|
||||
final SimpleString name,
|
||||
final SimpleString filterString,
|
||||
final SimpleString user,
|
||||
boolean durable) throws Exception;
|
||||
|
||||
Queue createQueue(SimpleString address,
|
||||
|
@ -192,6 +193,14 @@ public interface ActiveMQServer extends ActiveMQComponent
|
|||
Queue createQueue(SimpleString address,
|
||||
SimpleString queueName,
|
||||
SimpleString filter,
|
||||
SimpleString user,
|
||||
boolean durable,
|
||||
boolean temporary) throws Exception;
|
||||
|
||||
Queue createQueue(SimpleString address,
|
||||
SimpleString queueName,
|
||||
SimpleString filter,
|
||||
SimpleString user,
|
||||
boolean durable,
|
||||
boolean temporary,
|
||||
boolean autoCreated) throws Exception;
|
||||
|
|
|
@ -235,4 +235,9 @@ public interface Queue extends Bindable
|
|||
void postAcknowledge(MessageReference ref);
|
||||
|
||||
float getRate();
|
||||
|
||||
/**
|
||||
* @return the user who created this queue
|
||||
*/
|
||||
SimpleString getUser();
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ public interface QueueFactory
|
|||
SimpleString name,
|
||||
Filter filter,
|
||||
PageSubscription pageSubscription,
|
||||
SimpleString user,
|
||||
boolean durable,
|
||||
boolean temporary,
|
||||
boolean autoCreated);
|
||||
|
|
|
@ -117,6 +117,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
|
||||
import org.apache.activemq.artemis.core.version.Version;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||
|
@ -1045,6 +1046,9 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
{
|
||||
securityStore.authenticate(username, password);
|
||||
}
|
||||
|
||||
checkSessionLimit(username);
|
||||
|
||||
final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
|
||||
final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize,
|
||||
connection, autoCommitSends, autoCommitAcks, preAcknowledge,
|
||||
|
@ -1055,6 +1059,73 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
return session;
|
||||
}
|
||||
|
||||
private void checkSessionLimit(String username) throws Exception
|
||||
{
|
||||
if (configuration.getResourceLimitSettings() != null && configuration.getResourceLimitSettings().containsKey(username))
|
||||
{
|
||||
ResourceLimitSettings limits = configuration.getResourceLimitSettings().get(username);
|
||||
|
||||
if (limits.getMaxConnections() == -1)
|
||||
{
|
||||
return;
|
||||
}
|
||||
else if (limits.getMaxConnections() == 0 || getSessionCountForUser(username) >= limits.getMaxConnections())
|
||||
{
|
||||
throw ActiveMQMessageBundle.BUNDLE.sessionLimitReached(username, limits.getMaxConnections());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int getSessionCountForUser(String username)
|
||||
{
|
||||
int sessionCount = 0;
|
||||
|
||||
for (Entry<String, ServerSession> sessionEntry : sessions.entrySet())
|
||||
{
|
||||
if (sessionEntry.getValue().getUsername().toString().equals(username))
|
||||
{
|
||||
sessionCount++;
|
||||
}
|
||||
}
|
||||
|
||||
return sessionCount;
|
||||
}
|
||||
|
||||
public void checkQueueCreationLimit(String username) throws Exception
|
||||
{
|
||||
if (configuration.getResourceLimitSettings() != null && configuration.getResourceLimitSettings().containsKey(username))
|
||||
{
|
||||
ResourceLimitSettings limits = configuration.getResourceLimitSettings().get(username);
|
||||
|
||||
if (limits.getMaxQueues() == -1)
|
||||
{
|
||||
return;
|
||||
}
|
||||
else if (limits.getMaxQueues() == 0 || getQueueCountForUser(username) >= limits.getMaxQueues())
|
||||
{
|
||||
throw ActiveMQMessageBundle.BUNDLE.queueLimitReached(username, limits.getMaxConnections());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public int getQueueCountForUser(String username) throws Exception
|
||||
{
|
||||
Map<SimpleString, Binding> bindings = postOffice.getAllBindings();
|
||||
|
||||
int queuesForUser = 0;
|
||||
|
||||
for (Binding binding : bindings.values())
|
||||
{
|
||||
if (binding instanceof LocalQueueBinding && ((LocalQueueBinding) binding).getQueue().getUser().equals(SimpleString.toSimpleString(username)))
|
||||
{
|
||||
queuesForUser++;
|
||||
}
|
||||
}
|
||||
|
||||
return queuesForUser;
|
||||
|
||||
}
|
||||
|
||||
protected ServerSessionImpl internalCreateSession(String name, String username,
|
||||
String password, int minLargeMessageSize,
|
||||
RemotingConnection connection, boolean autoCommitSends,
|
||||
|
@ -1207,17 +1278,28 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
final boolean durable,
|
||||
final boolean temporary) throws Exception
|
||||
{
|
||||
return createQueue(address, queueName, filterString, durable, temporary, false, false, false);
|
||||
return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false);
|
||||
}
|
||||
|
||||
public Queue createQueue(final SimpleString address,
|
||||
final SimpleString queueName,
|
||||
final SimpleString filterString,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary) throws Exception
|
||||
{
|
||||
return createQueue(address, queueName, filterString, user, durable, temporary, false, false, false);
|
||||
}
|
||||
|
||||
public Queue createQueue(final SimpleString address,
|
||||
final SimpleString queueName,
|
||||
final SimpleString filterString,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated) throws Exception
|
||||
{
|
||||
return createQueue(address, queueName, filterString, durable, temporary, false, false, autoCreated);
|
||||
return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1235,9 +1317,10 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
public void createSharedQueue(final SimpleString address,
|
||||
final SimpleString name,
|
||||
final SimpleString filterString,
|
||||
final SimpleString user,
|
||||
boolean durable) throws Exception
|
||||
{
|
||||
Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable, false);
|
||||
Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false);
|
||||
|
||||
if (!queue.getAddress().equals(address))
|
||||
{
|
||||
|
@ -1286,7 +1369,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
{
|
||||
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
|
||||
|
||||
return createQueue(address, queueName, filterString, durable, temporary, true, false, false);
|
||||
return createQueue(address, queueName, filterString, null, durable, temporary, true, false, false);
|
||||
}
|
||||
|
||||
public void destroyQueue(final SimpleString queueName) throws Exception
|
||||
|
@ -1960,6 +2043,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
private Queue createQueue(final SimpleString address,
|
||||
final SimpleString queueName,
|
||||
final SimpleString filterString,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean ignoreIfExists,
|
||||
|
@ -2003,6 +2087,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
queueName,
|
||||
filter,
|
||||
pageSubscription,
|
||||
user,
|
||||
durable,
|
||||
temporary,
|
||||
autoCreated);
|
||||
|
|
|
@ -51,6 +51,7 @@ public class LastValueQueue extends QueueImpl
|
|||
final SimpleString name,
|
||||
final Filter filter,
|
||||
final PageSubscription pageSubscription,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated,
|
||||
|
@ -65,6 +66,7 @@ public class LastValueQueue extends QueueImpl
|
|||
name,
|
||||
filter,
|
||||
pageSubscription,
|
||||
user,
|
||||
durable,
|
||||
temporary,
|
||||
autoCreated,
|
||||
|
|
|
@ -155,6 +155,7 @@ public class PostOfficeJournalLoader implements JournalLoader
|
|||
queueBindingInfo.getQueueName(),
|
||||
filter,
|
||||
subscription,
|
||||
queueBindingInfo.getUser(),
|
||||
true,
|
||||
false,
|
||||
queueBindingInfo.isAutoCreated());
|
||||
|
|
|
@ -70,6 +70,7 @@ public class QueueFactoryImpl implements QueueFactory
|
|||
final SimpleString name,
|
||||
final Filter filter,
|
||||
final PageSubscription pageSubscription,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated)
|
||||
|
@ -84,6 +85,7 @@ public class QueueFactoryImpl implements QueueFactory
|
|||
name,
|
||||
filter,
|
||||
pageSubscription,
|
||||
user,
|
||||
durable,
|
||||
temporary,
|
||||
autoCreated,
|
||||
|
@ -100,6 +102,7 @@ public class QueueFactoryImpl implements QueueFactory
|
|||
name,
|
||||
filter,
|
||||
pageSubscription,
|
||||
user,
|
||||
durable,
|
||||
temporary,
|
||||
autoCreated,
|
||||
|
|
|
@ -119,6 +119,8 @@ public class QueueImpl implements Queue
|
|||
|
||||
private final SimpleString name;
|
||||
|
||||
private final SimpleString user;
|
||||
|
||||
private volatile Filter filter;
|
||||
|
||||
private final boolean durable;
|
||||
|
@ -309,6 +311,7 @@ public class QueueImpl implements Queue
|
|||
final SimpleString address,
|
||||
final SimpleString name,
|
||||
final Filter filter,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated,
|
||||
|
@ -323,6 +326,7 @@ public class QueueImpl implements Queue
|
|||
name,
|
||||
filter,
|
||||
null,
|
||||
user,
|
||||
durable,
|
||||
temporary,
|
||||
autoCreated,
|
||||
|
@ -338,6 +342,7 @@ public class QueueImpl implements Queue
|
|||
final SimpleString name,
|
||||
final Filter filter,
|
||||
final PageSubscription pageSubscription,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated,
|
||||
|
@ -395,6 +400,7 @@ public class QueueImpl implements Queue
|
|||
|
||||
this.executor = executor;
|
||||
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
// Bindable implementation -------------------------------------------------------------------------------------
|
||||
|
@ -409,6 +415,11 @@ public class QueueImpl implements Queue
|
|||
return name;
|
||||
}
|
||||
|
||||
public SimpleString getUser()
|
||||
{
|
||||
return user;
|
||||
}
|
||||
|
||||
public boolean isExclusive()
|
||||
{
|
||||
return false;
|
||||
|
|
|
@ -559,14 +559,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener
|
|||
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
||||
}
|
||||
|
||||
((ActiveMQServerImpl)server).checkQueueCreationLimit(getUsername());
|
||||
|
||||
// any non-temporary JMS queue created via this method should be marked as auto-created
|
||||
if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name))
|
||||
{
|
||||
server.createQueue(address, name, filterString, durable, temporary, true);
|
||||
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
server.createQueue(address, name, filterString, durable, temporary);
|
||||
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
|
||||
}
|
||||
|
||||
if (temporary)
|
||||
|
@ -602,7 +604,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener
|
|||
{
|
||||
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
||||
|
||||
server.createSharedQueue(address, name, filterString, durable);
|
||||
((ActiveMQServerImpl)server).checkQueueCreationLimit(getUsername());
|
||||
|
||||
server.createSharedQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable);
|
||||
}
|
||||
|
||||
public RemotingConnection getRemotingConnection()
|
||||
|
|
|
@ -0,0 +1,173 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.settings.impl;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.utils.BufferHelper;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ResourceLimitSettings implements Serializable, EncodingSupport
|
||||
{
|
||||
private static final long serialVersionUID = -110638321333856932L;
|
||||
|
||||
public static final SimpleString DEFAULT_MATCH = null;
|
||||
|
||||
public static final Integer DEFAULT_MAX_CONNECTIONS = -1;
|
||||
|
||||
public static final Integer DEFAULT_MAX_QUEUES = -1;
|
||||
|
||||
// public static final Long DEFAULT_MAX_QUEUE_SIZE_BYTES = -1L;
|
||||
|
||||
// public static final SimpleString DEFAULT_QUEUE_NAME_REGEX = new SimpleString(".+");
|
||||
|
||||
SimpleString match = null;
|
||||
|
||||
Integer maxConnections = null;
|
||||
|
||||
Integer maxQueues = null;
|
||||
|
||||
// Long maxQueueSizeBytes = null;
|
||||
|
||||
// SimpleString queueNameRegex = null;
|
||||
|
||||
public SimpleString getMatch()
|
||||
{
|
||||
return match != null ? match : DEFAULT_MATCH;
|
||||
}
|
||||
|
||||
public int getMaxConnections()
|
||||
{
|
||||
return maxConnections != null ? maxConnections : DEFAULT_MAX_CONNECTIONS;
|
||||
}
|
||||
|
||||
public int getMaxQueues()
|
||||
{
|
||||
return maxQueues != null ? maxQueues : DEFAULT_MAX_QUEUES;
|
||||
}
|
||||
|
||||
// public long getMaxQueueSizeBytes()
|
||||
// {
|
||||
// return maxQueueSizeBytes != null ? maxQueueSizeBytes : DEFAULT_MAX_QUEUE_SIZE_BYTES;
|
||||
// }
|
||||
//
|
||||
// public SimpleString getQueueNameRegex()
|
||||
// {
|
||||
// return queueNameRegex != null ? queueNameRegex : DEFAULT_QUEUE_NAME_REGEX;
|
||||
// }
|
||||
|
||||
public void setMatch(SimpleString match)
|
||||
{
|
||||
this.match = match;
|
||||
}
|
||||
|
||||
public void setMaxConnections(int maxConnections)
|
||||
{
|
||||
this.maxConnections = maxConnections;
|
||||
}
|
||||
|
||||
public void setMaxQueues(int maxQueues)
|
||||
{
|
||||
this.maxQueues = maxQueues;
|
||||
}
|
||||
|
||||
// public void setMaxQueueSizeBytes(long maxQueueSizeBytes)
|
||||
// {
|
||||
// this.maxQueueSizeBytes = maxQueueSizeBytes;
|
||||
// }
|
||||
//
|
||||
// public void setQueueNameRegex(SimpleString queueNameRegex)
|
||||
// {
|
||||
// this.queueNameRegex = queueNameRegex;
|
||||
// }
|
||||
|
||||
@Override
|
||||
public int getEncodeSize()
|
||||
{
|
||||
return SimpleString.sizeofNullableString(match) +
|
||||
BufferHelper.sizeOfNullableInteger(maxConnections) +
|
||||
BufferHelper.sizeOfNullableInteger(maxQueues);
|
||||
// BufferHelper.sizeOfNullableLong(maxQueueSizeBytes) +
|
||||
// SimpleString.sizeofNullableString(queueNameRegex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer)
|
||||
{
|
||||
buffer.writeNullableSimpleString(match);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, maxConnections);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, maxQueues);
|
||||
|
||||
// BufferHelper.writeNullableLong(buffer, maxQueueSizeBytes);
|
||||
|
||||
// buffer.writeNullableSimpleString(queueNameRegex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer)
|
||||
{
|
||||
match = buffer.readNullableSimpleString();
|
||||
|
||||
maxConnections = BufferHelper.readNullableInteger(buffer);
|
||||
|
||||
maxQueues = BufferHelper.readNullableInteger(buffer);
|
||||
|
||||
// maxQueueSizeBytes = BufferHelper.readNullableLong(buffer);
|
||||
|
||||
// queueNameRegex = buffer.readNullableSimpleString();
|
||||
}
|
||||
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see java.lang.Object#hashCode()
|
||||
*/
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((match == null) ? 0 : match.hashCode());
|
||||
result = prime * result + ((maxConnections == null) ? 0 : maxConnections.hashCode());
|
||||
result = prime * result + ((maxQueues == null) ? 0 : maxQueues.hashCode());
|
||||
// result = prime * result + ((maxQueueSizeBytes == null) ? 0 : maxQueueSizeBytes.hashCode());
|
||||
// result = prime * result + ((queueNameRegex == null) ? 0 : queueNameRegex.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see java.lang.Object#toString()
|
||||
*/
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ResourceLimitSettings [match=" + match +
|
||||
", maxConnections=" +
|
||||
maxConnections +
|
||||
", maxQueues=" +
|
||||
maxQueues +
|
||||
// ", maxQueueSizeBytes=" +
|
||||
// maxQueueSizeBytes +
|
||||
// ", queueNameRegex=" +
|
||||
// queueNameRegex +
|
||||
"]";
|
||||
}
|
||||
}
|
|
@ -728,6 +728,19 @@
|
|||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="resource-limit-settings" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
a list of resource limit settings
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:complexType>
|
||||
<xsd:sequence>
|
||||
<xsd:element ref="resource-limit-setting" maxOccurs="unbounded" minOccurs="0"/>
|
||||
</xsd:sequence>
|
||||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="connector-services" maxOccurs="1" minOccurs="0">
|
||||
<xsd:complexType>
|
||||
<xsd:sequence>
|
||||
|
@ -2103,6 +2116,41 @@
|
|||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="resource-limit-setting">
|
||||
<xsd:complexType>
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Complex type element to configure resource limits for a particular user.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:all>
|
||||
<xsd:element name="max-connections" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how many connections are allowed by the matched entity (-1 means no limit, default is -1)
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="max-queues" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how many queues can be created by the matched entity (-1 means no limit, default is -1)
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
|
||||
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
the name of the user to whom the limits should be applied
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="filter">
|
||||
<xsd:complexType>
|
||||
<xsd:annotation>
|
||||
|
|
|
@ -315,6 +315,9 @@ public class FileConfigurationTest extends ConfigurationImplTest
|
|||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());
|
||||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
|
||||
|
||||
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
||||
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
||||
assertEquals(13, conf.getResourceLimitSettings().get("myUser").getMaxQueues());
|
||||
|
||||
assertEquals(2, conf.getQueueConfigurations().size());
|
||||
|
||||
|
|
|
@ -1512,5 +1512,11 @@ public class ScheduledDeliveryHandlerTest extends Assert
|
|||
{
|
||||
return 0.0f;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getUser()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -264,6 +264,12 @@
|
|||
<auto-delete-jms-queues>false</auto-delete-jms-queues>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
<resource-limit-settings>
|
||||
<resource-limit-setting match="myUser">
|
||||
<max-connections>104</max-connections>
|
||||
<max-queues>13</max-queues>
|
||||
</resource-limit-setting>
|
||||
</resource-limit-settings>
|
||||
<connector-services>
|
||||
<connector-service>
|
||||
<factory-class>org.foo</factory-class>
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
* [Extra Acknowledge Modes](pre-acknowledge.md)
|
||||
* [Management](management.md)
|
||||
* [Security](security.md)
|
||||
* [Resource Limits](resource-limits.md)
|
||||
* [The JMS Bridge](jms-bridge.md)
|
||||
* [Client Reconnection and Session Reattachment](client-reconnection.md)
|
||||
* [Diverting and Splitting Message Flows](diverts.md)
|
||||
|
|
|
@ -285,6 +285,14 @@ Netty for simple TCP:
|
|||
connector will let the system pick up an ephemeral port. valid ports
|
||||
are 0 to 65535
|
||||
|
||||
- `connectionsAllowed`. This is only valid for acceptors. It limits the
|
||||
number of connections which the acceptor will allow. When this limit
|
||||
is reached a DEBUG level message is issued to the log, and the connection
|
||||
is refused. The type of client in use will determine what happens when
|
||||
the connection is refused. In the case of a `core` client, it will
|
||||
result in a `org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException`.
|
||||
|
||||
|
||||
## Configuring Netty SSL
|
||||
|
||||
Netty SSL is similar to the Netty TCP transport but it provides
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
# Resource Limits
|
||||
|
||||
Sometimes it's helpful to set particular limits on what certain users can
|
||||
do beyond the normal security settings related to authorization and
|
||||
authentication. For example, limiting how many connections a user can create
|
||||
or how many queues a user can create. This chapter will explain how to
|
||||
configure such limits.
|
||||
|
||||
## Configuring Limits Via Resource Limit Settings
|
||||
|
||||
Here is an example of the XML used to set resource limits:
|
||||
|
||||
<resource-limit-settings>
|
||||
<resource-limit-setting match="myUser">
|
||||
<max-connections>5</max-connections>
|
||||
<max-queues>3</max-queues>
|
||||
</resource-limit-setting>
|
||||
</resource-limit-settings>
|
||||
|
||||
Unlike the `match` from `address-setting`, this `match` does not use
|
||||
any wild-card syntax. It's a simple 1:1 mapping of the limits to a user.
|
||||
|
||||
`max-connections` defines how many connections the matched user can make
|
||||
to the broker. The default is -1 which means there is no limit.
|
||||
|
||||
`max-queues` defines how many queues the matched user can create. The default
|
||||
is -1 which means there is no limit.
|
|
@ -15,6 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.junit.Before;
|
||||
|
||||
import org.junit.Test;
|
||||
|
@ -103,6 +104,26 @@ public class DurableQueueTest extends ServiceTestBase
|
|||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserEncoding() throws Exception
|
||||
{
|
||||
final String userName = "myUser";
|
||||
session.close();
|
||||
session = sf.createSession(userName, "myPass", false, true, true, false, 0);
|
||||
|
||||
SimpleString queue = RandomUtil.randomSimpleString();
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
|
||||
session.createQueue(address, queue, true);
|
||||
|
||||
session.close();
|
||||
|
||||
server.stop();
|
||||
server.start();
|
||||
|
||||
assertEquals(1, ((ActiveMQServerImpl) server).getQueueCountForUser(userName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProduceAndConsumeFromDurableQueueAfterServerRestart() throws Exception
|
||||
{
|
||||
|
|
|
@ -237,6 +237,7 @@ public class HangConsumerTest extends ServiceTestBase
|
|||
final SimpleString address,
|
||||
final SimpleString name,
|
||||
final Filter filter,
|
||||
final SimpleString user,
|
||||
final PageSubscription pageSubscription,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
|
@ -252,6 +253,7 @@ public class HangConsumerTest extends ServiceTestBase
|
|||
name,
|
||||
filter,
|
||||
pageSubscription,
|
||||
user,
|
||||
durable,
|
||||
temporary,
|
||||
autoCreated,
|
||||
|
@ -293,6 +295,7 @@ public class HangConsumerTest extends ServiceTestBase
|
|||
final SimpleString name,
|
||||
final Filter filter,
|
||||
final PageSubscription pageSubscription,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated)
|
||||
|
@ -301,6 +304,7 @@ public class HangConsumerTest extends ServiceTestBase
|
|||
address,
|
||||
name,
|
||||
filter,
|
||||
user,
|
||||
pageSubscription,
|
||||
durable,
|
||||
temporary,
|
||||
|
@ -403,7 +407,7 @@ public class HangConsumerTest extends ServiceTestBase
|
|||
|
||||
|
||||
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
|
||||
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, false, null, null, null, null, null), server.getNodeID());
|
||||
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
|
||||
server.getStorageManager().addQueueBinding(txID, newBinding);
|
||||
server.getStorageManager().commitBindings(txID);
|
||||
|
||||
|
|
|
@ -498,6 +498,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
|||
SimpleString address,
|
||||
SimpleString name,
|
||||
Filter filter,
|
||||
SimpleString user,
|
||||
PageSubscription pageSubscription,
|
||||
boolean durable,
|
||||
boolean temporary,
|
||||
|
@ -513,6 +514,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
|||
name,
|
||||
filter,
|
||||
pageSubscription,
|
||||
user,
|
||||
durable,
|
||||
temporary,
|
||||
autoCreated,
|
||||
|
@ -566,6 +568,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
|||
SimpleString name,
|
||||
Filter filter,
|
||||
PageSubscription pageSubscription,
|
||||
SimpleString user,
|
||||
boolean durable,
|
||||
boolean temporary,
|
||||
boolean autoCreated)
|
||||
|
@ -575,6 +578,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
|||
address,
|
||||
name,
|
||||
filter,
|
||||
user,
|
||||
pageSubscription,
|
||||
durable,
|
||||
temporary,
|
||||
|
|
|
@ -80,7 +80,7 @@ public class TopicCleanupTest extends JMSTestBase
|
|||
{
|
||||
long txid = storage.generateID();
|
||||
|
||||
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, false, server.getScheduledPool(), server.getPostOffice(),
|
||||
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(),
|
||||
storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
|
||||
|
||||
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.server;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.tests.util.UnitTestCase;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class ConnectionLimitTest extends UnitTestCase
|
||||
{
|
||||
private ActiveMQServer server;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
super.setUp();
|
||||
|
||||
Map nettyParams = new HashMap();
|
||||
nettyParams.put(TransportConstants.CONNECTIONS_ALLOWED, 1);
|
||||
|
||||
Map invmParams = new HashMap();
|
||||
invmParams.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.CONNECTIONS_ALLOWED, 1);
|
||||
|
||||
Configuration configuration = createBasicConfig()
|
||||
.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyParams))
|
||||
.addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmParams));
|
||||
|
||||
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInVMConnectionLimit() throws Exception
|
||||
{
|
||||
ServerLocator locator = addServerLocator(createNonHALocator(false));
|
||||
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
|
||||
|
||||
try
|
||||
{
|
||||
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
|
||||
fail("creating a session factory here should fail");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
assertTrue(e instanceof ActiveMQNotConnectedException);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNettyConnectionLimit() throws Exception
|
||||
{
|
||||
ServerLocator locator = addServerLocator(createNonHALocator(true));
|
||||
locator.setCallTimeout(3000);
|
||||
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
|
||||
ClientSession clientSession = addClientSession(clientSessionFactory.createSession());
|
||||
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
|
||||
|
||||
try
|
||||
{
|
||||
ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession());
|
||||
fail("creating a session here should fail");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
assertTrue(e instanceof ActiveMQConnectionTimedOutException);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,131 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.server;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
import org.apache.activemq.artemis.tests.util.UnitTestCase;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ResourceLimitTest extends UnitTestCase
|
||||
{
|
||||
private ActiveMQServer server;
|
||||
|
||||
private TransportConfiguration liveTC;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
super.setUp();
|
||||
|
||||
ResourceLimitSettings resourceLimitSettings = new ResourceLimitSettings();
|
||||
resourceLimitSettings.setMatch(SimpleString.toSimpleString("myUser"));
|
||||
resourceLimitSettings.setMaxConnections(1);
|
||||
resourceLimitSettings.setMaxQueues(1);
|
||||
|
||||
Configuration configuration = createBasicConfig()
|
||||
.addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY))
|
||||
.addResourceLimitSettings(resourceLimitSettings);
|
||||
|
||||
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSessionLimitForUser() throws Exception
|
||||
{
|
||||
ServerLocator locator = addServerLocator(createNonHALocator(false));
|
||||
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
|
||||
ClientSession clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||
|
||||
try
|
||||
{
|
||||
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
|
||||
ClientSession extraClientSession = extraClientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||
fail("creating a session factory here should fail");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||
}
|
||||
|
||||
clientSession.close();
|
||||
|
||||
clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||
|
||||
try
|
||||
{
|
||||
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
|
||||
ClientSession extraClientSession = extraClientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||
fail("creating a session factory here should fail");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueLimitForUser() throws Exception
|
||||
{
|
||||
ServerLocator locator = addServerLocator(createNonHALocator(false));
|
||||
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
|
||||
ClientSession clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||
clientSession.createQueue("address", "queue");
|
||||
|
||||
try
|
||||
{
|
||||
clientSession.createQueue("address", "anotherQueue");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||
}
|
||||
|
||||
clientSession.deleteQueue("queue");
|
||||
|
||||
clientSession.createQueue("address", "queue");
|
||||
|
||||
try
|
||||
{
|
||||
clientSession.createQueue("address", "anotherQueue");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
clientSession.createSharedQueue(SimpleString.toSimpleString("address"), SimpleString.toSimpleString("anotherQueue"), false);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -73,6 +73,7 @@ public class QueueConcurrentTest extends UnitTestCase
|
|||
new SimpleString("queue1"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
false);
|
||||
|
|
|
@ -622,4 +622,10 @@ public class FakeQueue implements Queue
|
|||
{
|
||||
return 0.0f;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getUser()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -1410,6 +1410,7 @@ public class QueueImplTest extends UnitTestCase
|
|||
QueueImplTest.address1,
|
||||
name,
|
||||
filter,
|
||||
null,
|
||||
durable,
|
||||
temporary,
|
||||
false,
|
||||
|
|
|
@ -41,6 +41,7 @@ public class FakeQueueFactory implements QueueFactory
|
|||
final SimpleString name,
|
||||
final Filter filter,
|
||||
final PageSubscription subscription,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated)
|
||||
|
@ -50,6 +51,7 @@ public class FakeQueueFactory implements QueueFactory
|
|||
name,
|
||||
filter,
|
||||
subscription,
|
||||
user,
|
||||
durable,
|
||||
temporary,
|
||||
autoCreated,
|
||||
|
|
Loading…
Reference in New Issue