ACTIVEMQ6-70 broker resource limits
Implements basic limits on the number of connections and number of queues a particular user can create to/on the broker.
This commit is contained in:
parent
57d29ed119
commit
f509ce7519
|
@ -664,8 +664,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
|
||||||
AMQServerSession fakeSession = new AMQServerSession(user, pass);
|
AMQServerSession fakeSession = new AMQServerSession(user, pass);
|
||||||
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
|
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
|
||||||
((ActiveMQServerImpl) server).getSecurityStore().check(qName, checkType, fakeSession);
|
((ActiveMQServerImpl) server).getSecurityStore().check(qName, checkType, fakeSession);
|
||||||
|
|
||||||
|
((ActiveMQServerImpl) server).checkQueueCreationLimit(user);
|
||||||
}
|
}
|
||||||
this.server.createQueue(qName, qName, null, false, true);
|
this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, true);
|
||||||
if (dest.isTemporary())
|
if (dest.isTemporary())
|
||||||
{
|
{
|
||||||
connection.registerTempQueue(qName);
|
connection.registerTempQueue(qName);
|
||||||
|
|
|
@ -343,7 +343,7 @@ public class AMQServerSession extends ServerSessionImpl
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
server.createQueue(address, name, filterString, durable, temporary);
|
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
|
||||||
|
|
||||||
if (temporary)
|
if (temporary)
|
||||||
{
|
{
|
||||||
|
|
|
@ -262,7 +262,7 @@ public final class StompConnection implements RemotingConnection
|
||||||
SimpleString queueName = new SimpleString(queue);
|
SimpleString queueName = new SimpleString(queue);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
manager.getServer().createQueue(queueName, queueName, null, true, false, true);
|
manager.getServer().createQueue(queueName, queueName, SimpleString.toSimpleString(this.getLogin()), null, true, false, true);
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Configuration is used to configure ActiveMQ servers.
|
* A Configuration is used to configure ActiveMQ servers.
|
||||||
|
@ -73,6 +74,21 @@ public interface Configuration
|
||||||
*/
|
*/
|
||||||
Configuration setPersistenceEnabled(boolean enable);
|
Configuration setPersistenceEnabled(boolean enable);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return usernames mapped to ResourceLimitSettings
|
||||||
|
*/
|
||||||
|
Map<String, ResourceLimitSettings> getResourceLimitSettings();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param resourceLimitSettings usernames mapped to ResourceLimitSettings
|
||||||
|
*/
|
||||||
|
Configuration setResourceLimitSettings(Map<String, ResourceLimitSettings> resourceLimitSettings);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param resourceLimitSettings usernames mapped to ResourceLimitSettings
|
||||||
|
*/
|
||||||
|
Configuration addResourceLimitSettings(ResourceLimitSettings resourceLimitSettings);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the period (in milliseconds) to scan configuration files used by deployment. <br>
|
* Returns the period (in milliseconds) to scan configuration files used by deployment. <br>
|
||||||
* Default value is {@value org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_FILE_DEPLOYER_SCAN_PERIOD}.
|
* Default value is {@value org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_FILE_DEPLOYER_SCAN_PERIOD}.
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||||
|
|
||||||
public class ConfigurationImpl implements Configuration, Serializable
|
public class ConfigurationImpl implements Configuration, Serializable
|
||||||
{
|
{
|
||||||
|
@ -200,6 +201,8 @@ public class ConfigurationImpl implements Configuration, Serializable
|
||||||
|
|
||||||
private Map<String, AddressSettings> addressesSettings = new HashMap<String, AddressSettings>();
|
private Map<String, AddressSettings> addressesSettings = new HashMap<String, AddressSettings>();
|
||||||
|
|
||||||
|
private Map<String, ResourceLimitSettings> resourceLimitSettings = new HashMap<String, ResourceLimitSettings>();
|
||||||
|
|
||||||
private Map<String, Set<Role>> securitySettings = new HashMap<String, Set<Role>>();
|
private Map<String, Set<Role>> securitySettings = new HashMap<String, Set<Role>>();
|
||||||
|
|
||||||
protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<ConnectorServiceConfiguration>();
|
protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<ConnectorServiceConfiguration>();
|
||||||
|
@ -1023,6 +1026,26 @@ public class ConfigurationImpl implements Configuration, Serializable
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, ResourceLimitSettings> getResourceLimitSettings()
|
||||||
|
{
|
||||||
|
return resourceLimitSettings;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ConfigurationImpl setResourceLimitSettings(final Map<String, ResourceLimitSettings> resourceLimitSettings)
|
||||||
|
{
|
||||||
|
this.resourceLimitSettings = resourceLimitSettings;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ConfigurationImpl addResourceLimitSettings(ResourceLimitSettings resourceLimitSettings)
|
||||||
|
{
|
||||||
|
this.resourceLimitSettings.put(resourceLimitSettings.getMatch().toString(), resourceLimitSettings);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Set<Role>> getSecurityRoles()
|
public Map<String, Set<Role>> getSecurityRoles()
|
||||||
{
|
{
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||||
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
|
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
|
||||||
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
|
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
|
||||||
|
@ -147,6 +148,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
||||||
|
|
||||||
private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues";
|
private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues";
|
||||||
|
|
||||||
|
private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections";
|
||||||
|
|
||||||
|
private static final String MAX_QUEUES_NODE_NAME = "max-queues";
|
||||||
|
|
||||||
// Attributes ----------------------------------------------------
|
// Attributes ----------------------------------------------------
|
||||||
|
|
||||||
private boolean validateAIO = false;
|
private boolean validateAIO = false;
|
||||||
|
@ -611,6 +616,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
||||||
|
|
||||||
parseAddressSettings(e, config);
|
parseAddressSettings(e, config);
|
||||||
|
|
||||||
|
parseResourceLimits(e, config);
|
||||||
|
|
||||||
parseQueues(e, config);
|
parseQueues(e, config);
|
||||||
|
|
||||||
parseSecurity(e, config);
|
parseSecurity(e, config);
|
||||||
|
@ -689,6 +696,25 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param e
|
||||||
|
* @param config
|
||||||
|
*/
|
||||||
|
private void parseResourceLimits(final Element e, final Configuration config)
|
||||||
|
{
|
||||||
|
NodeList elements = e.getElementsByTagName("resource-limit-settings");
|
||||||
|
|
||||||
|
if (elements.getLength() != 0)
|
||||||
|
{
|
||||||
|
Element node = (Element) elements.item(0);
|
||||||
|
NodeList list = node.getElementsByTagName("resource-limit-setting");
|
||||||
|
for (int i = 0; i < list.getLength(); i++)
|
||||||
|
{
|
||||||
|
config.addResourceLimitSettings(parseResourceLimitSettings(list.item(i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param node
|
* @param node
|
||||||
* @return
|
* @return
|
||||||
|
@ -903,6 +929,34 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
||||||
return setting;
|
return setting;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param node
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
protected ResourceLimitSettings parseResourceLimitSettings(final Node node)
|
||||||
|
{
|
||||||
|
ResourceLimitSettings resourceLimitSettings = new ResourceLimitSettings();
|
||||||
|
|
||||||
|
resourceLimitSettings.setMatch(SimpleString.toSimpleString(getAttributeValue(node, "match")));
|
||||||
|
|
||||||
|
NodeList children = node.getChildNodes();
|
||||||
|
|
||||||
|
for (int i = 0; i < children.getLength(); i++)
|
||||||
|
{
|
||||||
|
final Node child = children.item(i);
|
||||||
|
final String name = child.getNodeName();
|
||||||
|
if (MAX_CONNECTIONS_NODE_NAME.equalsIgnoreCase(name))
|
||||||
|
{
|
||||||
|
resourceLimitSettings.setMaxConnections(XMLUtil.parseInt(child));
|
||||||
|
}
|
||||||
|
else if (MAX_QUEUES_NODE_NAME.equalsIgnoreCase(name))
|
||||||
|
{
|
||||||
|
resourceLimitSettings.setMaxQueues(XMLUtil.parseInt(child));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return resourceLimitSettings;
|
||||||
|
}
|
||||||
|
|
||||||
protected CoreQueueConfiguration parseQueueConfiguration(final Node node)
|
protected CoreQueueConfiguration parseQueueConfiguration(final Node node)
|
||||||
{
|
{
|
||||||
String name = getAttributeValue(node, "name");
|
String name = getAttributeValue(node, "name");
|
||||||
|
|
|
@ -36,4 +36,6 @@ public interface QueueBindingInfo
|
||||||
|
|
||||||
boolean isAutoCreated();
|
boolean isAutoCreated();
|
||||||
|
|
||||||
|
SimpleString getUser();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2027,6 +2027,7 @@ public class JournalStorageManager implements StorageManager
|
||||||
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
|
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
|
||||||
binding.getAddress(),
|
binding.getAddress(),
|
||||||
filterString,
|
filterString,
|
||||||
|
queue.getUser(),
|
||||||
queue.isAutoCreated());
|
queue.isAutoCreated());
|
||||||
|
|
||||||
readLock();
|
readLock();
|
||||||
|
@ -3045,6 +3046,8 @@ public class JournalStorageManager implements StorageManager
|
||||||
|
|
||||||
public boolean autoCreated;
|
public boolean autoCreated;
|
||||||
|
|
||||||
|
public SimpleString user;
|
||||||
|
|
||||||
public PersistentQueueBindingEncoding()
|
public PersistentQueueBindingEncoding()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -3059,6 +3062,8 @@ public class JournalStorageManager implements StorageManager
|
||||||
address +
|
address +
|
||||||
", filterString=" +
|
", filterString=" +
|
||||||
filterString +
|
filterString +
|
||||||
|
", user=" +
|
||||||
|
user +
|
||||||
", autoCreated=" +
|
", autoCreated=" +
|
||||||
autoCreated +
|
autoCreated +
|
||||||
"]";
|
"]";
|
||||||
|
@ -3067,11 +3072,13 @@ public class JournalStorageManager implements StorageManager
|
||||||
public PersistentQueueBindingEncoding(final SimpleString name,
|
public PersistentQueueBindingEncoding(final SimpleString name,
|
||||||
final SimpleString address,
|
final SimpleString address,
|
||||||
final SimpleString filterString,
|
final SimpleString filterString,
|
||||||
|
final SimpleString user,
|
||||||
final boolean autoCreated)
|
final boolean autoCreated)
|
||||||
{
|
{
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.address = address;
|
this.address = address;
|
||||||
this.filterString = filterString;
|
this.filterString = filterString;
|
||||||
|
this.user = user;
|
||||||
this.autoCreated = autoCreated;
|
this.autoCreated = autoCreated;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3105,6 +3112,11 @@ public class JournalStorageManager implements StorageManager
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SimpleString getUser()
|
||||||
|
{
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isAutoCreated()
|
public boolean isAutoCreated()
|
||||||
{
|
{
|
||||||
return autoCreated;
|
return autoCreated;
|
||||||
|
@ -3115,6 +3127,24 @@ public class JournalStorageManager implements StorageManager
|
||||||
name = buffer.readSimpleString();
|
name = buffer.readSimpleString();
|
||||||
address = buffer.readSimpleString();
|
address = buffer.readSimpleString();
|
||||||
filterString = buffer.readNullableSimpleString();
|
filterString = buffer.readNullableSimpleString();
|
||||||
|
|
||||||
|
String metadata = buffer.readNullableSimpleString().toString();
|
||||||
|
if (metadata != null)
|
||||||
|
{
|
||||||
|
String[] elements = metadata.split(";");
|
||||||
|
for (String element : elements)
|
||||||
|
{
|
||||||
|
String[] keyValuePair = element.split("=");
|
||||||
|
if (keyValuePair.length == 2)
|
||||||
|
{
|
||||||
|
if (keyValuePair[0].equals("user"))
|
||||||
|
{
|
||||||
|
user = SimpleString.toSimpleString(keyValuePair[1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
autoCreated = buffer.readBoolean();
|
autoCreated = buffer.readBoolean();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3123,13 +3153,22 @@ public class JournalStorageManager implements StorageManager
|
||||||
buffer.writeSimpleString(name);
|
buffer.writeSimpleString(name);
|
||||||
buffer.writeSimpleString(address);
|
buffer.writeSimpleString(address);
|
||||||
buffer.writeNullableSimpleString(filterString);
|
buffer.writeNullableSimpleString(filterString);
|
||||||
|
buffer.writeNullableSimpleString(createMetadata());
|
||||||
buffer.writeBoolean(autoCreated);
|
buffer.writeBoolean(autoCreated);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getEncodeSize()
|
public int getEncodeSize()
|
||||||
{
|
{
|
||||||
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
|
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
|
||||||
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN;
|
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
|
||||||
|
SimpleString.sizeofNullableString(createMetadata());
|
||||||
|
}
|
||||||
|
|
||||||
|
private SimpleString createMetadata()
|
||||||
|
{
|
||||||
|
StringBuilder metadata = new StringBuilder();
|
||||||
|
metadata.append("user=").append(user).append(";");
|
||||||
|
return SimpleString.toSimpleString(metadata.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -353,4 +353,10 @@ public interface ActiveMQMessageBundle
|
||||||
|
|
||||||
@Message(id = 119109, value = "unsupported HA Policy Configuration {0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 119109, value = "unsupported HA Policy Configuration {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
ActiveMQIllegalStateException unsupportedHAPolicyConfiguration(Object o);
|
ActiveMQIllegalStateException unsupportedHAPolicyConfiguration(Object o);
|
||||||
|
|
||||||
|
@Message(id = 119110, value = "Too many sessions for user ''{0}''. Sessions allowed: {1}.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
ActiveMQSessionCreationException sessionLimitReached(String username, int limit);
|
||||||
|
|
||||||
|
@Message(id = 119111, value = "Too many queues created by user ''{0}''. Queues allowed: {1}.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
ActiveMQSessionCreationException queueLimitReached(String username, int limit);
|
||||||
}
|
}
|
||||||
|
|
|
@ -181,6 +181,7 @@ public interface ActiveMQServer extends ActiveMQComponent
|
||||||
void createSharedQueue(final SimpleString address,
|
void createSharedQueue(final SimpleString address,
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final SimpleString filterString,
|
final SimpleString filterString,
|
||||||
|
final SimpleString user,
|
||||||
boolean durable) throws Exception;
|
boolean durable) throws Exception;
|
||||||
|
|
||||||
Queue createQueue(SimpleString address,
|
Queue createQueue(SimpleString address,
|
||||||
|
@ -192,6 +193,14 @@ public interface ActiveMQServer extends ActiveMQComponent
|
||||||
Queue createQueue(SimpleString address,
|
Queue createQueue(SimpleString address,
|
||||||
SimpleString queueName,
|
SimpleString queueName,
|
||||||
SimpleString filter,
|
SimpleString filter,
|
||||||
|
SimpleString user,
|
||||||
|
boolean durable,
|
||||||
|
boolean temporary) throws Exception;
|
||||||
|
|
||||||
|
Queue createQueue(SimpleString address,
|
||||||
|
SimpleString queueName,
|
||||||
|
SimpleString filter,
|
||||||
|
SimpleString user,
|
||||||
boolean durable,
|
boolean durable,
|
||||||
boolean temporary,
|
boolean temporary,
|
||||||
boolean autoCreated) throws Exception;
|
boolean autoCreated) throws Exception;
|
||||||
|
|
|
@ -235,4 +235,9 @@ public interface Queue extends Bindable
|
||||||
void postAcknowledge(MessageReference ref);
|
void postAcknowledge(MessageReference ref);
|
||||||
|
|
||||||
float getRate();
|
float getRate();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the user who created this queue
|
||||||
|
*/
|
||||||
|
SimpleString getUser();
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ public interface QueueFactory
|
||||||
SimpleString name,
|
SimpleString name,
|
||||||
Filter filter,
|
Filter filter,
|
||||||
PageSubscription pageSubscription,
|
PageSubscription pageSubscription,
|
||||||
|
SimpleString user,
|
||||||
boolean durable,
|
boolean durable,
|
||||||
boolean temporary,
|
boolean temporary,
|
||||||
boolean autoCreated);
|
boolean autoCreated);
|
||||||
|
|
|
@ -117,6 +117,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
|
||||||
import org.apache.activemq.artemis.core.version.Version;
|
import org.apache.activemq.artemis.core.version.Version;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||||
|
@ -1045,6 +1046,9 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
{
|
{
|
||||||
securityStore.authenticate(username, password);
|
securityStore.authenticate(username, password);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkSessionLimit(username);
|
||||||
|
|
||||||
final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
|
final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
|
||||||
final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize,
|
final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize,
|
||||||
connection, autoCommitSends, autoCommitAcks, preAcknowledge,
|
connection, autoCommitSends, autoCommitAcks, preAcknowledge,
|
||||||
|
@ -1055,6 +1059,73 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
return session;
|
return session;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkSessionLimit(String username) throws Exception
|
||||||
|
{
|
||||||
|
if (configuration.getResourceLimitSettings() != null && configuration.getResourceLimitSettings().containsKey(username))
|
||||||
|
{
|
||||||
|
ResourceLimitSettings limits = configuration.getResourceLimitSettings().get(username);
|
||||||
|
|
||||||
|
if (limits.getMaxConnections() == -1)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else if (limits.getMaxConnections() == 0 || getSessionCountForUser(username) >= limits.getMaxConnections())
|
||||||
|
{
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.sessionLimitReached(username, limits.getMaxConnections());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getSessionCountForUser(String username)
|
||||||
|
{
|
||||||
|
int sessionCount = 0;
|
||||||
|
|
||||||
|
for (Entry<String, ServerSession> sessionEntry : sessions.entrySet())
|
||||||
|
{
|
||||||
|
if (sessionEntry.getValue().getUsername().toString().equals(username))
|
||||||
|
{
|
||||||
|
sessionCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sessionCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkQueueCreationLimit(String username) throws Exception
|
||||||
|
{
|
||||||
|
if (configuration.getResourceLimitSettings() != null && configuration.getResourceLimitSettings().containsKey(username))
|
||||||
|
{
|
||||||
|
ResourceLimitSettings limits = configuration.getResourceLimitSettings().get(username);
|
||||||
|
|
||||||
|
if (limits.getMaxQueues() == -1)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else if (limits.getMaxQueues() == 0 || getQueueCountForUser(username) >= limits.getMaxQueues())
|
||||||
|
{
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.queueLimitReached(username, limits.getMaxConnections());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getQueueCountForUser(String username) throws Exception
|
||||||
|
{
|
||||||
|
Map<SimpleString, Binding> bindings = postOffice.getAllBindings();
|
||||||
|
|
||||||
|
int queuesForUser = 0;
|
||||||
|
|
||||||
|
for (Binding binding : bindings.values())
|
||||||
|
{
|
||||||
|
if (binding instanceof LocalQueueBinding && ((LocalQueueBinding) binding).getQueue().getUser().equals(SimpleString.toSimpleString(username)))
|
||||||
|
{
|
||||||
|
queuesForUser++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return queuesForUser;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
protected ServerSessionImpl internalCreateSession(String name, String username,
|
protected ServerSessionImpl internalCreateSession(String name, String username,
|
||||||
String password, int minLargeMessageSize,
|
String password, int minLargeMessageSize,
|
||||||
RemotingConnection connection, boolean autoCommitSends,
|
RemotingConnection connection, boolean autoCommitSends,
|
||||||
|
@ -1207,17 +1278,28 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary) throws Exception
|
final boolean temporary) throws Exception
|
||||||
{
|
{
|
||||||
return createQueue(address, queueName, filterString, durable, temporary, false, false, false);
|
return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Queue createQueue(final SimpleString address,
|
public Queue createQueue(final SimpleString address,
|
||||||
final SimpleString queueName,
|
final SimpleString queueName,
|
||||||
final SimpleString filterString,
|
final SimpleString filterString,
|
||||||
|
final SimpleString user,
|
||||||
|
final boolean durable,
|
||||||
|
final boolean temporary) throws Exception
|
||||||
|
{
|
||||||
|
return createQueue(address, queueName, filterString, user, durable, temporary, false, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Queue createQueue(final SimpleString address,
|
||||||
|
final SimpleString queueName,
|
||||||
|
final SimpleString filterString,
|
||||||
|
final SimpleString user,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final boolean autoCreated) throws Exception
|
final boolean autoCreated) throws Exception
|
||||||
{
|
{
|
||||||
return createQueue(address, queueName, filterString, durable, temporary, false, false, autoCreated);
|
return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1235,9 +1317,10 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
public void createSharedQueue(final SimpleString address,
|
public void createSharedQueue(final SimpleString address,
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final SimpleString filterString,
|
final SimpleString filterString,
|
||||||
|
final SimpleString user,
|
||||||
boolean durable) throws Exception
|
boolean durable) throws Exception
|
||||||
{
|
{
|
||||||
Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable, false);
|
Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false);
|
||||||
|
|
||||||
if (!queue.getAddress().equals(address))
|
if (!queue.getAddress().equals(address))
|
||||||
{
|
{
|
||||||
|
@ -1286,7 +1369,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
{
|
{
|
||||||
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
|
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
|
||||||
|
|
||||||
return createQueue(address, queueName, filterString, durable, temporary, true, false, false);
|
return createQueue(address, queueName, filterString, null, durable, temporary, true, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void destroyQueue(final SimpleString queueName) throws Exception
|
public void destroyQueue(final SimpleString queueName) throws Exception
|
||||||
|
@ -1960,6 +2043,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
private Queue createQueue(final SimpleString address,
|
private Queue createQueue(final SimpleString address,
|
||||||
final SimpleString queueName,
|
final SimpleString queueName,
|
||||||
final SimpleString filterString,
|
final SimpleString filterString,
|
||||||
|
final SimpleString user,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final boolean ignoreIfExists,
|
final boolean ignoreIfExists,
|
||||||
|
@ -2003,6 +2087,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
queueName,
|
queueName,
|
||||||
filter,
|
filter,
|
||||||
pageSubscription,
|
pageSubscription,
|
||||||
|
user,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
autoCreated);
|
autoCreated);
|
||||||
|
|
|
@ -51,6 +51,7 @@ public class LastValueQueue extends QueueImpl
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final Filter filter,
|
final Filter filter,
|
||||||
final PageSubscription pageSubscription,
|
final PageSubscription pageSubscription,
|
||||||
|
final SimpleString user,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final boolean autoCreated,
|
final boolean autoCreated,
|
||||||
|
@ -65,6 +66,7 @@ public class LastValueQueue extends QueueImpl
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
pageSubscription,
|
pageSubscription,
|
||||||
|
user,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
autoCreated,
|
autoCreated,
|
||||||
|
|
|
@ -155,6 +155,7 @@ public class PostOfficeJournalLoader implements JournalLoader
|
||||||
queueBindingInfo.getQueueName(),
|
queueBindingInfo.getQueueName(),
|
||||||
filter,
|
filter,
|
||||||
subscription,
|
subscription,
|
||||||
|
queueBindingInfo.getUser(),
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
queueBindingInfo.isAutoCreated());
|
queueBindingInfo.isAutoCreated());
|
||||||
|
|
|
@ -70,6 +70,7 @@ public class QueueFactoryImpl implements QueueFactory
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final Filter filter,
|
final Filter filter,
|
||||||
final PageSubscription pageSubscription,
|
final PageSubscription pageSubscription,
|
||||||
|
final SimpleString user,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final boolean autoCreated)
|
final boolean autoCreated)
|
||||||
|
@ -84,6 +85,7 @@ public class QueueFactoryImpl implements QueueFactory
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
pageSubscription,
|
pageSubscription,
|
||||||
|
user,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
autoCreated,
|
autoCreated,
|
||||||
|
@ -100,6 +102,7 @@ public class QueueFactoryImpl implements QueueFactory
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
pageSubscription,
|
pageSubscription,
|
||||||
|
user,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
autoCreated,
|
autoCreated,
|
||||||
|
|
|
@ -119,6 +119,8 @@ public class QueueImpl implements Queue
|
||||||
|
|
||||||
private final SimpleString name;
|
private final SimpleString name;
|
||||||
|
|
||||||
|
private final SimpleString user;
|
||||||
|
|
||||||
private volatile Filter filter;
|
private volatile Filter filter;
|
||||||
|
|
||||||
private final boolean durable;
|
private final boolean durable;
|
||||||
|
@ -309,6 +311,7 @@ public class QueueImpl implements Queue
|
||||||
final SimpleString address,
|
final SimpleString address,
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final Filter filter,
|
final Filter filter,
|
||||||
|
final SimpleString user,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final boolean autoCreated,
|
final boolean autoCreated,
|
||||||
|
@ -323,6 +326,7 @@ public class QueueImpl implements Queue
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
null,
|
null,
|
||||||
|
user,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
autoCreated,
|
autoCreated,
|
||||||
|
@ -338,6 +342,7 @@ public class QueueImpl implements Queue
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final Filter filter,
|
final Filter filter,
|
||||||
final PageSubscription pageSubscription,
|
final PageSubscription pageSubscription,
|
||||||
|
final SimpleString user,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final boolean autoCreated,
|
final boolean autoCreated,
|
||||||
|
@ -395,6 +400,7 @@ public class QueueImpl implements Queue
|
||||||
|
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
|
|
||||||
|
this.user = user;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bindable implementation -------------------------------------------------------------------------------------
|
// Bindable implementation -------------------------------------------------------------------------------------
|
||||||
|
@ -409,6 +415,11 @@ public class QueueImpl implements Queue
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SimpleString getUser()
|
||||||
|
{
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isExclusive()
|
public boolean isExclusive()
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -559,14 +559,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener
|
||||||
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
((ActiveMQServerImpl)server).checkQueueCreationLimit(getUsername());
|
||||||
|
|
||||||
// any non-temporary JMS queue created via this method should be marked as auto-created
|
// any non-temporary JMS queue created via this method should be marked as auto-created
|
||||||
if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name))
|
if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name))
|
||||||
{
|
{
|
||||||
server.createQueue(address, name, filterString, durable, temporary, true);
|
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
server.createQueue(address, name, filterString, durable, temporary);
|
server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (temporary)
|
if (temporary)
|
||||||
|
@ -602,7 +604,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener
|
||||||
{
|
{
|
||||||
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
||||||
|
|
||||||
server.createSharedQueue(address, name, filterString, durable);
|
((ActiveMQServerImpl)server).checkQueueCreationLimit(getUsername());
|
||||||
|
|
||||||
|
server.createSharedQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RemotingConnection getRemotingConnection()
|
public RemotingConnection getRemotingConnection()
|
||||||
|
|
|
@ -0,0 +1,173 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.core.settings.impl;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
|
import org.apache.activemq.artemis.utils.BufferHelper;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class ResourceLimitSettings implements Serializable, EncodingSupport
|
||||||
|
{
|
||||||
|
private static final long serialVersionUID = -110638321333856932L;
|
||||||
|
|
||||||
|
public static final SimpleString DEFAULT_MATCH = null;
|
||||||
|
|
||||||
|
public static final Integer DEFAULT_MAX_CONNECTIONS = -1;
|
||||||
|
|
||||||
|
public static final Integer DEFAULT_MAX_QUEUES = -1;
|
||||||
|
|
||||||
|
// public static final Long DEFAULT_MAX_QUEUE_SIZE_BYTES = -1L;
|
||||||
|
|
||||||
|
// public static final SimpleString DEFAULT_QUEUE_NAME_REGEX = new SimpleString(".+");
|
||||||
|
|
||||||
|
SimpleString match = null;
|
||||||
|
|
||||||
|
Integer maxConnections = null;
|
||||||
|
|
||||||
|
Integer maxQueues = null;
|
||||||
|
|
||||||
|
// Long maxQueueSizeBytes = null;
|
||||||
|
|
||||||
|
// SimpleString queueNameRegex = null;
|
||||||
|
|
||||||
|
public SimpleString getMatch()
|
||||||
|
{
|
||||||
|
return match != null ? match : DEFAULT_MATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxConnections()
|
||||||
|
{
|
||||||
|
return maxConnections != null ? maxConnections : DEFAULT_MAX_CONNECTIONS;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxQueues()
|
||||||
|
{
|
||||||
|
return maxQueues != null ? maxQueues : DEFAULT_MAX_QUEUES;
|
||||||
|
}
|
||||||
|
|
||||||
|
// public long getMaxQueueSizeBytes()
|
||||||
|
// {
|
||||||
|
// return maxQueueSizeBytes != null ? maxQueueSizeBytes : DEFAULT_MAX_QUEUE_SIZE_BYTES;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// public SimpleString getQueueNameRegex()
|
||||||
|
// {
|
||||||
|
// return queueNameRegex != null ? queueNameRegex : DEFAULT_QUEUE_NAME_REGEX;
|
||||||
|
// }
|
||||||
|
|
||||||
|
public void setMatch(SimpleString match)
|
||||||
|
{
|
||||||
|
this.match = match;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxConnections(int maxConnections)
|
||||||
|
{
|
||||||
|
this.maxConnections = maxConnections;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxQueues(int maxQueues)
|
||||||
|
{
|
||||||
|
this.maxQueues = maxQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
// public void setMaxQueueSizeBytes(long maxQueueSizeBytes)
|
||||||
|
// {
|
||||||
|
// this.maxQueueSizeBytes = maxQueueSizeBytes;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// public void setQueueNameRegex(SimpleString queueNameRegex)
|
||||||
|
// {
|
||||||
|
// this.queueNameRegex = queueNameRegex;
|
||||||
|
// }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getEncodeSize()
|
||||||
|
{
|
||||||
|
return SimpleString.sizeofNullableString(match) +
|
||||||
|
BufferHelper.sizeOfNullableInteger(maxConnections) +
|
||||||
|
BufferHelper.sizeOfNullableInteger(maxQueues);
|
||||||
|
// BufferHelper.sizeOfNullableLong(maxQueueSizeBytes) +
|
||||||
|
// SimpleString.sizeofNullableString(queueNameRegex);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void encode(ActiveMQBuffer buffer)
|
||||||
|
{
|
||||||
|
buffer.writeNullableSimpleString(match);
|
||||||
|
|
||||||
|
BufferHelper.writeNullableInteger(buffer, maxConnections);
|
||||||
|
|
||||||
|
BufferHelper.writeNullableInteger(buffer, maxQueues);
|
||||||
|
|
||||||
|
// BufferHelper.writeNullableLong(buffer, maxQueueSizeBytes);
|
||||||
|
|
||||||
|
// buffer.writeNullableSimpleString(queueNameRegex);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void decode(ActiveMQBuffer buffer)
|
||||||
|
{
|
||||||
|
match = buffer.readNullableSimpleString();
|
||||||
|
|
||||||
|
maxConnections = BufferHelper.readNullableInteger(buffer);
|
||||||
|
|
||||||
|
maxQueues = BufferHelper.readNullableInteger(buffer);
|
||||||
|
|
||||||
|
// maxQueueSizeBytes = BufferHelper.readNullableLong(buffer);
|
||||||
|
|
||||||
|
// queueNameRegex = buffer.readNullableSimpleString();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* (non-Javadoc)
|
||||||
|
* @see java.lang.Object#hashCode()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
result = prime * result + ((match == null) ? 0 : match.hashCode());
|
||||||
|
result = prime * result + ((maxConnections == null) ? 0 : maxConnections.hashCode());
|
||||||
|
result = prime * result + ((maxQueues == null) ? 0 : maxQueues.hashCode());
|
||||||
|
// result = prime * result + ((maxQueueSizeBytes == null) ? 0 : maxQueueSizeBytes.hashCode());
|
||||||
|
// result = prime * result + ((queueNameRegex == null) ? 0 : queueNameRegex.hashCode());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* (non-Javadoc)
|
||||||
|
* @see java.lang.Object#toString()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "ResourceLimitSettings [match=" + match +
|
||||||
|
", maxConnections=" +
|
||||||
|
maxConnections +
|
||||||
|
", maxQueues=" +
|
||||||
|
maxQueues +
|
||||||
|
// ", maxQueueSizeBytes=" +
|
||||||
|
// maxQueueSizeBytes +
|
||||||
|
// ", queueNameRegex=" +
|
||||||
|
// queueNameRegex +
|
||||||
|
"]";
|
||||||
|
}
|
||||||
|
}
|
|
@ -728,6 +728,19 @@
|
||||||
</xsd:complexType>
|
</xsd:complexType>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="resource-limit-settings" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
a list of resource limit settings
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
<xsd:complexType>
|
||||||
|
<xsd:sequence>
|
||||||
|
<xsd:element ref="resource-limit-setting" maxOccurs="unbounded" minOccurs="0"/>
|
||||||
|
</xsd:sequence>
|
||||||
|
</xsd:complexType>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
<xsd:element name="connector-services" maxOccurs="1" minOccurs="0">
|
<xsd:element name="connector-services" maxOccurs="1" minOccurs="0">
|
||||||
<xsd:complexType>
|
<xsd:complexType>
|
||||||
<xsd:sequence>
|
<xsd:sequence>
|
||||||
|
@ -2103,6 +2116,41 @@
|
||||||
</xsd:complexType>
|
</xsd:complexType>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="resource-limit-setting">
|
||||||
|
<xsd:complexType>
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
Complex type element to configure resource limits for a particular user.
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
<xsd:all>
|
||||||
|
<xsd:element name="max-connections" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
how many connections are allowed by the matched entity (-1 means no limit, default is -1)
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="max-queues" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
how many queues can be created by the matched entity (-1 means no limit, default is -1)
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
</xsd:all>
|
||||||
|
|
||||||
|
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the name of the user to whom the limits should be applied
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
</xsd:complexType>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
<xsd:element name="filter">
|
<xsd:element name="filter">
|
||||||
<xsd:complexType>
|
<xsd:complexType>
|
||||||
<xsd:annotation>
|
<xsd:annotation>
|
||||||
|
|
|
@ -315,6 +315,9 @@ public class FileConfigurationTest extends ConfigurationImplTest
|
||||||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());
|
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());
|
||||||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
|
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
|
||||||
|
|
||||||
|
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
||||||
|
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
||||||
|
assertEquals(13, conf.getResourceLimitSettings().get("myUser").getMaxQueues());
|
||||||
|
|
||||||
assertEquals(2, conf.getQueueConfigurations().size());
|
assertEquals(2, conf.getQueueConfigurations().size());
|
||||||
|
|
||||||
|
|
|
@ -1512,5 +1512,11 @@ public class ScheduledDeliveryHandlerTest extends Assert
|
||||||
{
|
{
|
||||||
return 0.0f;
|
return 0.0f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SimpleString getUser()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -264,6 +264,12 @@
|
||||||
<auto-delete-jms-queues>false</auto-delete-jms-queues>
|
<auto-delete-jms-queues>false</auto-delete-jms-queues>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
||||||
|
<resource-limit-settings>
|
||||||
|
<resource-limit-setting match="myUser">
|
||||||
|
<max-connections>104</max-connections>
|
||||||
|
<max-queues>13</max-queues>
|
||||||
|
</resource-limit-setting>
|
||||||
|
</resource-limit-settings>
|
||||||
<connector-services>
|
<connector-services>
|
||||||
<connector-service>
|
<connector-service>
|
||||||
<factory-class>org.foo</factory-class>
|
<factory-class>org.foo</factory-class>
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
* [Extra Acknowledge Modes](pre-acknowledge.md)
|
* [Extra Acknowledge Modes](pre-acknowledge.md)
|
||||||
* [Management](management.md)
|
* [Management](management.md)
|
||||||
* [Security](security.md)
|
* [Security](security.md)
|
||||||
|
* [Resource Limits](resource-limits.md)
|
||||||
* [The JMS Bridge](jms-bridge.md)
|
* [The JMS Bridge](jms-bridge.md)
|
||||||
* [Client Reconnection and Session Reattachment](client-reconnection.md)
|
* [Client Reconnection and Session Reattachment](client-reconnection.md)
|
||||||
* [Diverting and Splitting Message Flows](diverts.md)
|
* [Diverting and Splitting Message Flows](diverts.md)
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
# Resource Limits
|
||||||
|
|
||||||
|
Sometimes it's helpful to set particular limits on what certain users can
|
||||||
|
do beyond the normal security settings related to authorization and
|
||||||
|
authentication. For example, limiting how many connections a user can create
|
||||||
|
or how many queues a user can create. This chapter will explain how to
|
||||||
|
configure such limits.
|
||||||
|
|
||||||
|
## Configuring Limits Via Resource Limit Settings
|
||||||
|
|
||||||
|
Here is an example of the XML used to set resource limits:
|
||||||
|
|
||||||
|
<resource-limit-settings>
|
||||||
|
<resource-limit-setting match="myUser">
|
||||||
|
<max-connections>5</max-connections>
|
||||||
|
<max-queues>3</max-queues>
|
||||||
|
</resource-limit-setting>
|
||||||
|
</resource-limit-settings>
|
||||||
|
|
||||||
|
Unlike the `match` from `address-setting`, this `match` does not use
|
||||||
|
any wild-card syntax. It's a simple 1:1 mapping of the limits to a user.
|
||||||
|
|
||||||
|
`max-connections` defines how many connections the matched user can make
|
||||||
|
to the broker. The default is -1 which means there is no limit.
|
||||||
|
|
||||||
|
`max-queues` defines how many queues the matched user can create. The default
|
||||||
|
is -1 which means there is no limit.
|
|
@ -15,6 +15,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.client;
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -103,6 +104,26 @@ public class DurableQueueTest extends ServiceTestBase
|
||||||
session.deleteQueue(queue);
|
session.deleteQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUserEncoding() throws Exception
|
||||||
|
{
|
||||||
|
final String userName = "myUser";
|
||||||
|
session.close();
|
||||||
|
session = sf.createSession(userName, "myPass", false, true, true, false, 0);
|
||||||
|
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session.createQueue(address, queue, true);
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
assertEquals(1, ((ActiveMQServerImpl) server).getQueueCountForUser(userName));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProduceAndConsumeFromDurableQueueAfterServerRestart() throws Exception
|
public void testProduceAndConsumeFromDurableQueueAfterServerRestart() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -237,6 +237,7 @@ public class HangConsumerTest extends ServiceTestBase
|
||||||
final SimpleString address,
|
final SimpleString address,
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final Filter filter,
|
final Filter filter,
|
||||||
|
final SimpleString user,
|
||||||
final PageSubscription pageSubscription,
|
final PageSubscription pageSubscription,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
|
@ -252,6 +253,7 @@ public class HangConsumerTest extends ServiceTestBase
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
pageSubscription,
|
pageSubscription,
|
||||||
|
user,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
autoCreated,
|
autoCreated,
|
||||||
|
@ -293,6 +295,7 @@ public class HangConsumerTest extends ServiceTestBase
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final Filter filter,
|
final Filter filter,
|
||||||
final PageSubscription pageSubscription,
|
final PageSubscription pageSubscription,
|
||||||
|
final SimpleString user,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final boolean autoCreated)
|
final boolean autoCreated)
|
||||||
|
@ -301,6 +304,7 @@ public class HangConsumerTest extends ServiceTestBase
|
||||||
address,
|
address,
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
|
user,
|
||||||
pageSubscription,
|
pageSubscription,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
|
@ -403,7 +407,7 @@ public class HangConsumerTest extends ServiceTestBase
|
||||||
|
|
||||||
|
|
||||||
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
|
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
|
||||||
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, false, null, null, null, null, null), server.getNodeID());
|
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
|
||||||
server.getStorageManager().addQueueBinding(txID, newBinding);
|
server.getStorageManager().addQueueBinding(txID, newBinding);
|
||||||
server.getStorageManager().commitBindings(txID);
|
server.getStorageManager().commitBindings(txID);
|
||||||
|
|
||||||
|
|
|
@ -498,6 +498,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
||||||
SimpleString address,
|
SimpleString address,
|
||||||
SimpleString name,
|
SimpleString name,
|
||||||
Filter filter,
|
Filter filter,
|
||||||
|
SimpleString user,
|
||||||
PageSubscription pageSubscription,
|
PageSubscription pageSubscription,
|
||||||
boolean durable,
|
boolean durable,
|
||||||
boolean temporary,
|
boolean temporary,
|
||||||
|
@ -513,6 +514,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
pageSubscription,
|
pageSubscription,
|
||||||
|
user,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
autoCreated,
|
autoCreated,
|
||||||
|
@ -566,6 +568,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
||||||
SimpleString name,
|
SimpleString name,
|
||||||
Filter filter,
|
Filter filter,
|
||||||
PageSubscription pageSubscription,
|
PageSubscription pageSubscription,
|
||||||
|
SimpleString user,
|
||||||
boolean durable,
|
boolean durable,
|
||||||
boolean temporary,
|
boolean temporary,
|
||||||
boolean autoCreated)
|
boolean autoCreated)
|
||||||
|
@ -575,6 +578,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
||||||
address,
|
address,
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
|
user,
|
||||||
pageSubscription,
|
pageSubscription,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class TopicCleanupTest extends JMSTestBase
|
||||||
{
|
{
|
||||||
long txid = storage.generateID();
|
long txid = storage.generateID();
|
||||||
|
|
||||||
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, false, server.getScheduledPool(), server.getPostOffice(),
|
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(),
|
||||||
storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
|
storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
|
||||||
|
|
||||||
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
|
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.server;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||||
|
import org.apache.activemq.artemis.tests.util.UnitTestCase;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ResourceLimitTest extends UnitTestCase
|
||||||
|
{
|
||||||
|
private ActiveMQServer server;
|
||||||
|
|
||||||
|
private TransportConfiguration liveTC;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
ResourceLimitSettings resourceLimitSettings = new ResourceLimitSettings();
|
||||||
|
resourceLimitSettings.setMatch(SimpleString.toSimpleString("myUser"));
|
||||||
|
resourceLimitSettings.setMaxConnections(1);
|
||||||
|
resourceLimitSettings.setMaxQueues(1);
|
||||||
|
|
||||||
|
Configuration configuration = createBasicConfig()
|
||||||
|
.addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY))
|
||||||
|
.addResourceLimitSettings(resourceLimitSettings);
|
||||||
|
|
||||||
|
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSessionLimitForUser() throws Exception
|
||||||
|
{
|
||||||
|
ServerLocator locator = addServerLocator(createNonHALocator(false));
|
||||||
|
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
|
||||||
|
ClientSession clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
|
||||||
|
ClientSession extraClientSession = extraClientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||||
|
fail("creating a session factory here should fail");
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||||
|
}
|
||||||
|
|
||||||
|
clientSession.close();
|
||||||
|
|
||||||
|
clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
|
||||||
|
ClientSession extraClientSession = extraClientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||||
|
fail("creating a session factory here should fail");
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueLimitForUser() throws Exception
|
||||||
|
{
|
||||||
|
ServerLocator locator = addServerLocator(createNonHALocator(false));
|
||||||
|
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
|
||||||
|
ClientSession clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||||
|
clientSession.createQueue("address", "queue");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
clientSession.createQueue("address", "anotherQueue");
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||||
|
}
|
||||||
|
|
||||||
|
clientSession.deleteQueue("queue");
|
||||||
|
|
||||||
|
clientSession.createQueue("address", "queue");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
clientSession.createQueue("address", "anotherQueue");
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
clientSession.createSharedQueue(SimpleString.toSimpleString("address"), SimpleString.toSimpleString("anotherQueue"), false);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -73,6 +73,7 @@ public class QueueConcurrentTest extends UnitTestCase
|
||||||
new SimpleString("queue1"),
|
new SimpleString("queue1"),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
false);
|
false);
|
||||||
|
|
|
@ -622,4 +622,10 @@ public class FakeQueue implements Queue
|
||||||
{
|
{
|
||||||
return 0.0f;
|
return 0.0f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SimpleString getUser()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -1410,6 +1410,7 @@ public class QueueImplTest extends UnitTestCase
|
||||||
QueueImplTest.address1,
|
QueueImplTest.address1,
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
|
null,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
false,
|
false,
|
||||||
|
|
|
@ -41,6 +41,7 @@ public class FakeQueueFactory implements QueueFactory
|
||||||
final SimpleString name,
|
final SimpleString name,
|
||||||
final Filter filter,
|
final Filter filter,
|
||||||
final PageSubscription subscription,
|
final PageSubscription subscription,
|
||||||
|
final SimpleString user,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final boolean autoCreated)
|
final boolean autoCreated)
|
||||||
|
@ -50,6 +51,7 @@ public class FakeQueueFactory implements QueueFactory
|
||||||
name,
|
name,
|
||||||
filter,
|
filter,
|
||||||
subscription,
|
subscription,
|
||||||
|
user,
|
||||||
durable,
|
durable,
|
||||||
temporary,
|
temporary,
|
||||||
autoCreated,
|
autoCreated,
|
||||||
|
|
Loading…
Reference in New Issue