mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-23 10:52:56 +00:00
This closes #3245
This commit is contained in:
commit
31910b5ed9
@ -23,11 +23,22 @@ public class CompositeAddress {
|
||||
public static String SEPARATOR = "::";
|
||||
|
||||
public static String toFullyQualified(String address, String qName) {
|
||||
return new StringBuilder().append(address).append(SEPARATOR).append(qName).toString();
|
||||
return toFullyQualified(SimpleString.toSimpleString(address), SimpleString.toSimpleString(qName)).toString();
|
||||
}
|
||||
|
||||
public static SimpleString toFullyQualified(SimpleString address, SimpleString qName) {
|
||||
return address.concat(SEPARATOR).concat(qName);
|
||||
SimpleString result;
|
||||
if (address == null && qName == null) {
|
||||
result = null;
|
||||
} else if (address != null && qName == null) {
|
||||
result = address;
|
||||
} else if (address == null && qName != null) {
|
||||
result = qName;
|
||||
} else {
|
||||
result = address.concat(SEPARATOR).concat(qName);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public static boolean isFullyQualified(String address) {
|
||||
|
@ -383,7 +383,6 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||
|
||||
void checkDestination(ActiveMQDestination destination) throws JMSException {
|
||||
SimpleString address = destination.getSimpleAddress();
|
||||
// TODO: What to do with FQQN
|
||||
if (!destination.isCreated()) {
|
||||
try {
|
||||
ClientSession.AddressQuery addressQuery = session.addressQuery(address);
|
||||
@ -419,6 +418,19 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||
throw new InvalidDestinationException("Destination " + address + " does not exist, address exists but autoCreateQueues=" + addressQuery.isAutoCreateQueues());
|
||||
}
|
||||
}
|
||||
} else if (CompositeAddress.isFullyQualified(address)) { // it could be a topic using FQQN
|
||||
ClientSession.QueueQuery queueQuery = session.queueQuery(address);
|
||||
if (!queueQuery.isExists()) {
|
||||
if (addressQuery.isAutoCreateQueues()) {
|
||||
if (destination.isTemporary()) {
|
||||
createTemporaryQueue(destination, RoutingType.MULTICAST, address, null, addressQuery);
|
||||
} else {
|
||||
createQueue(destination, RoutingType.MULTICAST, address, null, true, true, addressQuery);
|
||||
}
|
||||
} else {
|
||||
throw new InvalidDestinationException("Destination " + address + " does not exist, address exists but autoCreateQueues=" + addressQuery.isAutoCreateQueues());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (ActiveMQQueueExistsException thatsOK) {
|
||||
// nothing to be done
|
||||
@ -837,7 +849,22 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||
|
||||
queueName = new SimpleString(UUID.randomUUID().toString());
|
||||
|
||||
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
|
||||
if (!CompositeAddress.isFullyQualified(dest.getAddress())) {
|
||||
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
|
||||
} else {
|
||||
if (!response.isExists() || !response.getQueueNames().contains(getCoreQueueName(dest))) {
|
||||
if (response.isAutoCreateQueues()) {
|
||||
try {
|
||||
createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), null, true, true, response);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// The queue was created by another client/admin between the query check and send create queue packet
|
||||
}
|
||||
} else {
|
||||
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
|
||||
}
|
||||
}
|
||||
queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress());
|
||||
}
|
||||
|
||||
consumer = createClientConsumer(dest, queueName, null);
|
||||
autoDeleteQueueName = queueName;
|
||||
|
@ -215,7 +215,7 @@ public class ReadOnlyContext implements Context, Serializable {
|
||||
}
|
||||
if (result == null) {
|
||||
int pos = name.indexOf(':');
|
||||
if (pos > 0) {
|
||||
if (pos > 0 && !name.contains("::")) {
|
||||
String scheme = name.substring(0, pos);
|
||||
Context ctx = NamingManager.getURLContext(scheme, environment);
|
||||
if (ctx == null) {
|
||||
|
@ -524,7 +524,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||
|
||||
private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
|
||||
if (queueName != null) {
|
||||
QueueQueryResult result = sessionSPI.queueQuery(queueName, routingType, false);
|
||||
QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true);
|
||||
if (!result.isExists()) {
|
||||
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
|
||||
} else {
|
||||
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
||||
import org.apache.activemq.command.ConsumerControl;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
@ -213,7 +214,17 @@ public class AMQConsumer {
|
||||
session.getCoreSession().createQueue(new QueueConfiguration(queueName).setAddress(address).setFilterString(selector).setInternal(internalAddress));
|
||||
}
|
||||
} else {
|
||||
queueName = new SimpleString(UUID.randomUUID().toString());
|
||||
/*
|
||||
* The consumer may be using FQQN in which case the queue might already exist.
|
||||
*/
|
||||
if (CompositeAddress.isFullyQualified(physicalName)) {
|
||||
queueName = CompositeAddress.extractQueueName(SimpleString.toSimpleString(physicalName));
|
||||
if (session.getCoreServer().locateQueue(queueName) != null) {
|
||||
return queueName;
|
||||
}
|
||||
} else {
|
||||
queueName = new SimpleString(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
session.getCoreSession().createQueue(new QueueConfiguration(queueName).setAddress(address).setFilterString(selector).setDurable(false).setTemporary(true).setInternal(internalAddress));
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager2;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager4;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
import org.jboss.logging.Logger;
|
||||
@ -201,17 +202,28 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
|
||||
}
|
||||
|
||||
String user = session.getUsername();
|
||||
if (checkCached(address, user, checkType)) {
|
||||
// OK
|
||||
// bypass permission checks for management cluster user
|
||||
if (managementClusterUser.equals(user) && session.getPassword().equals(managementClusterPassword)) {
|
||||
return;
|
||||
}
|
||||
|
||||
String saddress = address.toString();
|
||||
|
||||
Set<Role> roles = securityRepository.getMatch(saddress);
|
||||
|
||||
// bypass permission checks for management cluster user
|
||||
if (managementClusterUser.equals(user) && session.getPassword().equals(managementClusterPassword)) {
|
||||
/*
|
||||
* If a valid queue is passed in and there's an exact match for the FQQN then use the FQQN instead of the address
|
||||
*/
|
||||
boolean isFullyQualified = false;
|
||||
SimpleString fqqn = null;
|
||||
if (queue != null) {
|
||||
fqqn = CompositeAddress.toFullyQualified(address, queue);
|
||||
if (securityRepository.containsExactMatch(fqqn.toString())) {
|
||||
roles = securityRepository.getMatch(fqqn.toString());
|
||||
isFullyQualified = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (checkCached(isFullyQualified ? fqqn : address, user, checkType)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -257,8 +269,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
|
||||
if (act != null) {
|
||||
set = act;
|
||||
}
|
||||
set.add(address);
|
||||
|
||||
set.add(isFullyQualified ? fqqn : address);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2195,13 +2195,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||
}
|
||||
|
||||
if (session != null) {
|
||||
|
||||
if (queue.isDurable()) {
|
||||
// make sure the user has privileges to delete this queue
|
||||
securityStore.check(address, queueName, CheckType.DELETE_DURABLE_QUEUE, session);
|
||||
} else {
|
||||
securityStore.check(address, queueName, CheckType.DELETE_NON_DURABLE_QUEUE, session);
|
||||
}
|
||||
// make sure the user has privileges to delete this queue
|
||||
securityStore.check(address, queueName, queue.isDurable() ? CheckType.DELETE_DURABLE_QUEUE : CheckType.DELETE_NON_DURABLE_QUEUE, session);
|
||||
}
|
||||
|
||||
// This check is only valid if checkConsumerCount == true
|
||||
|
@ -538,18 +538,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||
}
|
||||
|
||||
SimpleString address = removePrefix(binding.getAddress());
|
||||
if (browseOnly) {
|
||||
try {
|
||||
securityCheck(address, queueName, CheckType.BROWSE, this);
|
||||
} catch (Exception e) {
|
||||
securityCheck(address.concat(".").concat(unPrefixedQueueName), queueName, CheckType.BROWSE, this);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
securityCheck(address, queueName, CheckType.CONSUME, this);
|
||||
} catch (Exception e) {
|
||||
securityCheck(address.concat(".").concat(unPrefixedQueueName), queueName, CheckType.CONSUME, this);
|
||||
}
|
||||
try {
|
||||
securityCheck(address, unPrefixedQueueName, browseOnly ? CheckType.BROWSE : CheckType.CONSUME, this);
|
||||
} catch (Exception e) {
|
||||
// this is here for backwards compatibility with the pre-FQQN syntax from ARTEMIS-592
|
||||
securityCheck(address.concat(".").concat(unPrefixedQueueName), queueName, browseOnly ? CheckType.BROWSE : CheckType.CONSUME, this);
|
||||
}
|
||||
|
||||
Filter filter = FilterImpl.createFilter(filterString);
|
||||
@ -723,12 +716,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||
.setAddress(removePrefix(queueConfiguration.getAddress()))
|
||||
.setName(removePrefix(queueConfiguration.getName()));
|
||||
|
||||
if (queueConfiguration.isDurable()) {
|
||||
// make sure the user has privileges to create this queue
|
||||
securityCheck(queueConfiguration.getAddress(), queueConfiguration.getName(), CheckType.CREATE_DURABLE_QUEUE, this);
|
||||
} else {
|
||||
securityCheck(queueConfiguration.getAddress(), queueConfiguration.getName(), CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
||||
}
|
||||
// make sure the user has privileges to create this queue
|
||||
securityCheck(queueConfiguration.getAddress(), queueConfiguration.getName(), queueConfiguration.isDurable() ? CheckType.CREATE_DURABLE_QUEUE : CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
||||
|
||||
AddressSettings as = server.getAddressSettingsRepository().getMatch(queueConfiguration.getAddress().toString());
|
||||
|
||||
@ -1043,7 +1032,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||
}
|
||||
queueConfiguration.setAddress(removePrefix(queueConfiguration.getAddress()));
|
||||
|
||||
securityCheck(queueConfiguration.getAddress(), queueConfiguration.getName(), queueConfiguration.isDurable() == null || queueConfiguration.isDurable() ? CheckType.CREATE_DURABLE_QUEUE : CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
||||
securityCheck(queueConfiguration.getAddress(), queueConfiguration.getName(), queueConfiguration.isDurable() ? CheckType.CREATE_DURABLE_QUEUE : CheckType.CREATE_NON_DURABLE_QUEUE, this);
|
||||
|
||||
server.checkQueueCreationLimit(getUsername());
|
||||
|
||||
@ -2140,10 +2129,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||
|
||||
AddressInfo art = getAddressAndRoutingType(new AddressInfo(msg.getAddressSimpleString(), routingType));
|
||||
|
||||
// Consumer
|
||||
// check the user has write access to this address.
|
||||
try {
|
||||
securityCheck(art.getName(), CheckType.SEND, this);
|
||||
securityCheck(CompositeAddress.extractAddressName(art.getName()), CompositeAddress.isFullyQualified(art.getName()) ? CompositeAddress.extractQueueName(art.getName()) : null, CheckType.SEND, this);
|
||||
} catch (ActiveMQException e) {
|
||||
if (!autoCommitSends && tx != null) {
|
||||
tx.markAsRollbackOnly(e);
|
||||
|
@ -107,4 +107,6 @@ public interface HierarchicalRepository<T> {
|
||||
void clearCache();
|
||||
|
||||
int getCacheSize();
|
||||
|
||||
boolean containsExactMatch(String match);
|
||||
}
|
||||
|
@ -244,6 +244,11 @@ public class HierarchicalObjectRepository<T> implements HierarchicalRepository<T
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsExactMatch(String match) {
|
||||
return exactMatches.containsKey(match);
|
||||
}
|
||||
|
||||
/**
|
||||
* merge all the possible matches, if the values implement Mergeable then a full merge is done
|
||||
*
|
||||
|
@ -139,6 +139,36 @@ By not inheriting permissions, it allows you to effectively deny permissions in
|
||||
more specific security-setting blocks by simply not specifying them. Otherwise
|
||||
it would not be possible to deny permissions in sub-groups of addresses.
|
||||
|
||||
### Fine-grained security using fully qualified queue name
|
||||
|
||||
In certain situations it may be necessary to configure security that is more
|
||||
fine-grained that simply across an entire address. For example, consider an
|
||||
address with multiple queues:
|
||||
|
||||
```xml
|
||||
<addresses>
|
||||
<address name="foo">
|
||||
<anycast>
|
||||
<queue name="q1" />
|
||||
<queue name="q2" />
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
```
|
||||
|
||||
You may want to limit consumption from `q1` to one role and consumption from
|
||||
`q2` to another role. You can do this using the fully qualified queue name (i.e.
|
||||
fqqn") in the `match` of the `security-setting`, e.g.:
|
||||
|
||||
```xml
|
||||
<security-setting match="foo::q1">
|
||||
<permission type="consume" roles="q1Role"/>
|
||||
</security-setting>
|
||||
<security-setting match="foo::q2">
|
||||
<permission type="consume" roles="q2Role"/>
|
||||
</security-setting>
|
||||
```
|
||||
|
||||
## Security Setting Plugin
|
||||
|
||||
Aside from configuring sets of permissions via XML these permissions can
|
||||
|
@ -68,6 +68,7 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
|
||||
|
||||
@Test
|
||||
public void testFQQNTopicWhenQueueDoesNotExist() throws Exception {
|
||||
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(false));
|
||||
Exception e = null;
|
||||
String queueName = "testQueue";
|
||||
|
||||
@ -87,6 +88,47 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
|
||||
assertTrue(e.getMessage().contains("Queue: '" + queueName + "' does not exist"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopicFQQNSendAndConsumeAutoCreate() throws Exception {
|
||||
internalTopicFQQNSendAndConsume(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopicFQQNSendAndConsumeManualCreate() throws Exception {
|
||||
internalTopicFQQNSendAndConsume(false);
|
||||
}
|
||||
|
||||
private void internalTopicFQQNSendAndConsume(boolean autocreate) throws Exception {
|
||||
if (autocreate) {
|
||||
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true));
|
||||
} else {
|
||||
server.createQueue(new QueueConfiguration(anycastQ1).setAddress(multicastAddress).setDurable(false));
|
||||
}
|
||||
|
||||
try (Connection connection = createConnection(false)) {
|
||||
connection.setClientID("FQQNconn");
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic(CompositeAddress.toFullyQualified(multicastAddress, anycastQ1).toString());
|
||||
|
||||
MessageConsumer consumer1 = session.createConsumer(topic);
|
||||
MessageConsumer consumer2 = session.createConsumer(topic);
|
||||
MessageConsumer consumer3 = session.createConsumer(topic);
|
||||
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
|
||||
producer.send(session.createMessage());
|
||||
|
||||
//only 1 consumer receives the message as they're all connected to the same FQQN
|
||||
Message m = consumer1.receive(2000);
|
||||
assertNotNull(m);
|
||||
m = consumer2.receiveNoWait();
|
||||
assertNull(m);
|
||||
m = consumer3.receiveNoWait();
|
||||
assertNull(m);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumeQueueToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
|
||||
|
||||
|
@ -60,6 +60,7 @@ import org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -475,6 +476,29 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
|
||||
Assert.assertTrue(destination instanceof Queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueFQQN() throws Exception {
|
||||
final String QUEUE = "myQueue";
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
|
||||
props.put("queue.myQueue", "myAddress::" + QUEUE);
|
||||
Context ctx = new InitialContext(props);
|
||||
liveService.getSecurityStore().setSecurityEnabled(false);
|
||||
|
||||
Destination destination = (Destination) ctx.lookup(QUEUE);
|
||||
Assert.assertTrue(destination instanceof Queue);
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
try (Connection connection = connectionFactory.createConnection()) {
|
||||
Session session = connection.createSession();
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.send(session.createMessage());
|
||||
Wait.assertTrue(() -> liveService.locateQueue(QUEUE).getMessageCount() == 1, 2000, 100);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
connection.start();
|
||||
assertNotNull(consumer.receiveNoWait());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicQueue() throws NamingException, JMSException {
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
@ -485,6 +509,28 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
|
||||
Assert.assertTrue(destination instanceof Queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicQueueFQQN() throws Exception {
|
||||
final String QUEUE = "myQueue";
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
|
||||
Context ctx = new InitialContext(props);
|
||||
liveService.getSecurityStore().setSecurityEnabled(false);
|
||||
|
||||
Destination destination = (Destination) ctx.lookup("dynamicQueues/myAddress::" + QUEUE);
|
||||
Assert.assertTrue(destination instanceof Queue);
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
try (Connection connection = connectionFactory.createConnection()) {
|
||||
Session session = connection.createSession();
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.send(session.createMessage());
|
||||
Wait.assertTrue(() -> liveService.locateQueue(QUEUE).getMessageCount() == 1, 2000, 100);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
connection.start();
|
||||
assertNotNull(consumer.receiveNoWait());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopic() throws NamingException, JMSException {
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
@ -500,6 +546,28 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
|
||||
Assert.assertTrue(destination instanceof Topic);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopicFQQN() throws Exception {
|
||||
final String SUBSCRIPTION = "mySubsription";
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
|
||||
props.put("topic.myTopic", "myTopic::" + SUBSCRIPTION);
|
||||
Context ctx = new InitialContext(props);
|
||||
liveService.getSecurityStore().setSecurityEnabled(false);
|
||||
|
||||
Destination destination = (Destination) ctx.lookup("myTopic");
|
||||
Assert.assertTrue(destination instanceof Topic);
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
Session session = connection.createSession();
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.send(session.createMessage());
|
||||
Wait.assertTrue(() -> liveService.locateQueue(SUBSCRIPTION).getMessageCount() == 1, 2000, 100);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
connection.start();
|
||||
assertNotNull(consumer.receiveNoWait());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicTopic() throws NamingException, JMSException {
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
@ -510,6 +578,28 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
|
||||
Assert.assertTrue(destination instanceof Topic);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicTopicFQQN() throws Exception {
|
||||
final String SUBSCRIPTION = "mySubsription";
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
|
||||
props.put("topic.myTopic", "myTopic::" + SUBSCRIPTION);
|
||||
Context ctx = new InitialContext(props);
|
||||
liveService.getSecurityStore().setSecurityEnabled(false);
|
||||
|
||||
Destination destination = (Destination) ctx.lookup("dynamicTopics/myTopic::" + SUBSCRIPTION);
|
||||
Assert.assertTrue(destination instanceof Topic);
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
Session session = connection.createSession();
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
producer.send(session.createMessage());
|
||||
Wait.assertTrue(() -> liveService.locateQueue(SUBSCRIPTION).getMessageCount() == 1, 2000, 100);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
connection.start();
|
||||
assertNotNull(consumer.receiveNoWait());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoteCFWithTCPUserPassword() throws Exception {
|
||||
|
||||
|
@ -123,6 +123,47 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopicFQQNSendAndConsumeAutoCreate() throws Exception {
|
||||
internalTopicFQQNSendAndConsume(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopicFQQNSendAndConsumeManualCreate() throws Exception {
|
||||
internalTopicFQQNSendAndConsume(false);
|
||||
}
|
||||
|
||||
private void internalTopicFQQNSendAndConsume(boolean autocreate) throws Exception {
|
||||
if (autocreate) {
|
||||
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true));
|
||||
} else {
|
||||
server.createQueue(new QueueConfiguration(anycastQ1).setAddress(multicastAddress).setDurable(false));
|
||||
}
|
||||
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
connection.setClientID("FQQNconn");
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic(CompositeAddress.toFullyQualified(multicastAddress, anycastQ1).toString());
|
||||
|
||||
MessageConsumer consumer1 = session.createConsumer(topic);
|
||||
MessageConsumer consumer2 = session.createConsumer(topic);
|
||||
MessageConsumer consumer3 = session.createConsumer(topic);
|
||||
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
|
||||
producer.send(session.createMessage());
|
||||
|
||||
//only 1 consumer receives the message as they're all connected to the same FQQN
|
||||
Message m = consumer1.receive(2000);
|
||||
assertNotNull(m);
|
||||
m = consumer2.receiveNoWait();
|
||||
assertNull(m);
|
||||
m = consumer3.receiveNoWait();
|
||||
assertNull(m);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueConsumerReceiveTopicUsingFQQN() throws Exception {
|
||||
|
||||
|
@ -65,6 +65,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager4;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.CreateMessage;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -509,7 +510,17 @@ public class SecurityTest extends ActiveMQTestBase {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJAASSecurityManagerAuthorizationSameAddressDifferentQueues() throws Exception {
|
||||
// this is for backwards compatibility with the pre-FQQN syntax from ARTEMIS-592
|
||||
public void testJAASSecurityManagerAuthorizationSameAddressDifferentQueuesDotSyntax() throws Exception {
|
||||
internalJAASSecurityManagerAuthorizationSameAddressDifferentQueues(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJAASSecurityManagerAuthorizationSameAddressDifferentQueuesFqqnSyntax() throws Exception {
|
||||
internalJAASSecurityManagerAuthorizationSameAddressDifferentQueues(true);
|
||||
}
|
||||
|
||||
private void internalJAASSecurityManagerAuthorizationSameAddressDifferentQueues(boolean fqqnSyntax) throws Exception {
|
||||
final SimpleString ADDRESS = new SimpleString("address");
|
||||
final SimpleString QUEUE_A = new SimpleString("a");
|
||||
final SimpleString QUEUE_B = new SimpleString("b");
|
||||
@ -518,10 +529,18 @@ public class SecurityTest extends ActiveMQTestBase {
|
||||
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false));
|
||||
Set<Role> aRoles = new HashSet<>();
|
||||
aRoles.add(new Role(QUEUE_A.toString(), false, true, false, false, false, false, false, false, false, false));
|
||||
server.getConfiguration().putSecurityRoles(ADDRESS.concat(".").concat(QUEUE_A).toString(), aRoles);
|
||||
if (fqqnSyntax) {
|
||||
server.getConfiguration().putSecurityRoles(CompositeAddress.toFullyQualified(ADDRESS, QUEUE_A).toString(), aRoles);
|
||||
} else {
|
||||
server.getConfiguration().putSecurityRoles(ADDRESS.concat(".").concat(QUEUE_A).toString(), aRoles);
|
||||
}
|
||||
Set<Role> bRoles = new HashSet<>();
|
||||
bRoles.add(new Role(QUEUE_B.toString(), false, true, false, false, false, false, false, false, false, false));
|
||||
server.getConfiguration().putSecurityRoles(ADDRESS.concat(".").concat(QUEUE_B).toString(), bRoles);
|
||||
if (fqqnSyntax) {
|
||||
server.getConfiguration().putSecurityRoles(CompositeAddress.toFullyQualified(ADDRESS, QUEUE_B).toString(), bRoles);
|
||||
} else {
|
||||
server.getConfiguration().putSecurityRoles(ADDRESS.concat(".").concat(QUEUE_B).toString(), bRoles);
|
||||
}
|
||||
server.start();
|
||||
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||
server.createQueue(new QueueConfiguration(QUEUE_A).setAddress(ADDRESS).setRoutingType(RoutingType.ANYCAST));
|
||||
|
Loading…
x
Reference in New Issue
Block a user