From a9d9731f0a644ff5bf6fb65583f1fabc832d9ae5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Fri, 18 May 2018 22:09:32 +0100 Subject: [PATCH] ARTEMIS-1872 Check for queue exists before creating shared queue 1. Add tests case to verify issue and fix, tests also tests for same behavior using CORE, OPENWIRE and AMQP JMS Clients. 2. Update Core Client to check for queue before creating, sharedQueue as per createQueue logic. 3. Update ServerSessionPacketHandler to handle packets from old clients to perform to implement the same fix server side for older clients. 4. Correct AMQP protocol so correct error code is returned on security exception so that amqp jms can correctly throw JMSsecurityException 5. Correct AMQP protocol to check for queue exists before create 6. Correct OpenWire protocol to check for address exists before create --- .../core/protocol/core/impl/PacketImpl.java | 2 + .../artemis/jms/client/ActiveMQSession.java | 23 +- .../amqp/broker/AMQPSessionCallback.java | 27 +- .../proton/ProtonServerSenderContext.java | 6 +- .../protocol/openwire/OpenWireConnection.java | 6 +- .../core/ServerSessionPacketHandler.java | 11 +- .../server/SecureConfigurationTest.java | 260 ++++++++++++++++++ .../src/test/resources/multicast_topic.xml | 146 ++++++++++ 8 files changed, 459 insertions(+), 22 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SecureConfigurationTest.java create mode 100644 tests/integration-tests/src/test/resources/multicast_topic.xml diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 684ca5c24d..c275e213b1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -31,6 +31,8 @@ public class PacketImpl implements Packet { // 2.0.0 public static final int ADDRESSING_CHANGE_VERSION = 129; + public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130; + public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic."); diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 5f29211319..3149ff03fa 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -46,6 +46,7 @@ import javax.transaction.xa.XAResource; import java.io.Serializable; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -630,16 +631,20 @@ public class ActiveMQSession implements QueueSession, TopicSession { queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName); - try { - if (durability == ConsumerDurability.DURABLE) { - createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); - } else { - createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); + QueueQuery subResponse = session.queueQuery(queueName); + + if (!(subResponse.isExists() && Objects.equals(subResponse.getAddress(), dest.getSimpleAddress()) && Objects.equals(subResponse.getFilterString(), coreFilterString))) { + try { + if (durability == ConsumerDurability.DURABLE) { + createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); + } else { + createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); + } + } catch (ActiveMQQueueExistsException ignored) { + // We ignore this because querying and then creating the queue wouldn't be idempotent + // we could also add a parameter to ignore existence what would require a bigger work around to avoid + // compatibility. } - } catch (ActiveMQQueueExistsException ignored) { - // We ignore this because querying and then creating the queue wouldn't be idempotent - // we could also add a parameter to ignore existence what would require a bigger work around to avoid - // compatibility. } consumer = session.createConsumer(queueName, null, false); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 105d58adc1..1301f0bfb8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -237,35 +238,51 @@ public class AMQPSessionCallback implements SessionCallback { } public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception { - serverSession.createQueue(queueName, queueName, routingType, null, true, false); + createTemporaryQueue(queueName, queueName, routingType, null); } public void createTemporaryQueue(SimpleString address, SimpleString queueName, RoutingType routingType, SimpleString filter) throws Exception { - serverSession.createQueue(address, queueName, routingType, filter, true, false); + try { + serverSession.createQueue(address, queueName, routingType, filter, true, false); + } catch (ActiveMQSecurityException se) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage()); + } } public void createUnsharedDurableQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter) throws Exception { - serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false); + try { + serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false); + } catch (ActiveMQSecurityException se) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage()); + } } public void createSharedDurableQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter) throws Exception { - serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false); + try { + serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false); + } catch (ActiveMQSecurityException se) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage()); + } } public void createSharedVolatileQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter) throws Exception { - serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true); + try { + serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true); + } catch (ActiveMQSecurityException se) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage()); + } } public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 9b4704f765..ddd9b39e84 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -23,7 +23,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -370,10 +369,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr isVolatile = true; if (shared && sender.getName() != null) { queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile); - try { + QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); + if (!(result.isExists() && Objects.equals(result.getAddress(), addressToUse) && Objects.equals(result.getFilterString(), simpleStringSelector))) { sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); - } catch (ActiveMQQueueExistsException e) { - //this is ok, just means its shared } } else { queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString()); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 21b2d46456..225aac4aee 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -812,8 +812,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) { try { - internalSession.createAddress(addressInfo, !dest.isTemporary()); - created = true; + if (internalSession.getAddress(addressInfo.getName()) == null) { + internalSession.createAddress(addressInfo, !dest.isTemporary()); + created = true; + } } catch (ActiveMQAddressExistsException exists) { // The address may have been created by another thread in the mean time. Catch and do nothing. } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index edfd566c15..36273f8c2b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.List; +import java.util.Objects; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -362,7 +363,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { case CREATE_SHARED_QUEUE: { CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet; requiresResponse = request.isRequiresResponse(); - session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString()); + QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); + if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) { + session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString()); + } if (requiresResponse) { response = new NullResponseMessage(); } @@ -371,7 +375,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { case CREATE_SHARED_QUEUE_V2: { CreateSharedQueueMessage_V2 request = (CreateSharedQueueMessage_V2) packet; requiresResponse = request.isRequiresResponse(); - session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue()); + QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); + if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) { + session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue()); + } if (requiresResponse) { response = new NullResponseMessage(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SecureConfigurationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SecureConfigurationTest.java new file mode 100644 index 0000000000..fd0a12efdb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SecureConfigurationTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.server; + +import org.apache.activemq.artemis.core.config.FileDeploymentManager; +import org.apache.activemq.artemis.core.config.impl.FileConfiguration; +import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class SecureConfigurationTest extends ActiveMQTestBase { + + @Parameterized.Parameters(name = "{index}: protocol={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {"CORE"}, {"AMQP"}, {"OPENWIRE"} + }); + } + + /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */ + @Parameterized.Parameter(0) + public String protocol; + + @Test + public void testSecureSharedDurableSubscriber() throws Exception { + //This is because OpenWire does not support JMS 2.0 + Assume.assumeFalse(protocol.equals("OPENWIRE")); + + ActiveMQServer server = getActiveMQServer("multicast_topic.xml"); + try { + server.start(); + internal_testSecureSharedDurableSubscriber(getConnectionFactory("b", "b")); + } finally { + try { + server.stop(); + } catch (Exception e) { + } + } + } + + private void internal_testSecureSharedDurableSubscriber(ConnectionFactory connectionFactory) throws JMSException { + String message = "blah"; + + //Expect to be able to create subscriber on pre-defined/existing queue. + String messageRecieved = sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared_durable", (t, s) -> s.createSharedDurableConsumer(t, "secured_topic_shared_durable/queue")); + Assert.assertEquals(message, messageRecieved); + + try { + sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared_durable", (t, s) -> s.createSharedDurableConsumer(t, "secured_topic_shared_durable/non-existant-queue")); + Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue"); + } catch (JMSSecurityException j) { + //Expected exception + } + + try { + sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared_durable", (t, s) -> s.createSharedDurableConsumer(t, "secured_topic_shared_durable/queue", "age < 10")); + Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue"); + } catch (JMSSecurityException j) { + //Expected exception + } + } + + @Test + public void testSecureSharedSubscriber() throws Exception { + //This is because OpenWire does not support JMS 2.0 + Assume.assumeFalse(protocol.equals("OPENWIRE")); + + ActiveMQServer server = getActiveMQServer("multicast_topic.xml"); + try { + server.start(); + internal_testSecureSharedSubscriber(getConnectionFactory("b", "b")); + } finally { + try { + server.stop(); + } catch (Exception e) { + } + } + } + + private void internal_testSecureSharedSubscriber(ConnectionFactory connectionFactory) throws JMSException { + String message = "blah"; + + //Expect to be able to create subscriber on pre-defined/existing queue. + String messageRecieved = sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared", (t, s) -> s.createSharedConsumer(t, "secured_topic_shared/queue")); + Assert.assertEquals(message, messageRecieved); + + try { + sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared", (t, s) -> s.createSharedConsumer(t, "secured_topic_shared/non-existant-queue")); + Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue"); + } catch (JMSSecurityException j) { + //Expected exception + } + + try { + sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared", (t, s) -> s.createSharedConsumer(t, "secured_topic_shared/queue", "age < 10")); + Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue"); + } catch (JMSSecurityException j) { + //Expected exception + } + } + + @Test + public void testSecureDurableSubscriber() throws Exception { + ActiveMQServer server = getActiveMQServer("multicast_topic.xml"); + try { + server.start(); + internal_testSecureDurableSubscriber(getConnectionFactory("b", "b")); + } finally { + try { + server.stop(); + } catch (Exception e) { + } + } + } + + private void internal_testSecureDurableSubscriber(ConnectionFactory connectionFactory) throws JMSException { + String message = "blah"; + + //Expect to be able to create subscriber on pre-defined/existing queue. + String messageRecieved = sendAndReceiveText(connectionFactory, "clientId", message, "secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, "secured_topic_durable/queue")); + Assert.assertEquals(message, messageRecieved); + + try { + sendAndReceiveText(connectionFactory, "clientId", message, "secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, "secured_topic_durable/non-existant-queue")); + Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue"); + } catch (JMSSecurityException j) { + //Expected exception + } + + try { + sendAndReceiveText(connectionFactory, "clientId", message, "secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, "secured_topic_durable/queue", "age < 10", false)); + Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue"); + } catch (JMSSecurityException j) { + //Expected exception + } + + try { + sendAndReceiveText(connectionFactory, "clientId", message, "secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, "secured_topic_durable/queue", "age < 10", true)); + Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue"); + } catch (JMSSecurityException j) { + //Expected exception + } + } + + private ConnectionFactory getConnectionFactory(String user, String password) { + switch (protocol) { + case "CORE": return getActiveMQConnectionFactory(user, password); + case "AMQP" : return getAMQPConnectionFactory(user, password); + case "OPENWIRE": return getOpenWireConnectionFactory(user, password); + default: throw new IllegalStateException("Unsupported Protocol"); + } + } + + private ActiveMQConnectionFactory getActiveMQConnectionFactory(String user, String password) { + ActiveMQConnectionFactory activeMQConnection = new ActiveMQConnectionFactory("tcp://localhost:61616"); + activeMQConnection.setUser(user); + activeMQConnection.setPassword(password); + return activeMQConnection; + } + + private JmsConnectionFactory getAMQPConnectionFactory(String user, String password) { + JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory("amqp://localhost:61616"); + jmsConnectionFactory.setUsername(user); + jmsConnectionFactory.setPassword(password); + return jmsConnectionFactory; + } + + private org.apache.activemq.ActiveMQConnectionFactory getOpenWireConnectionFactory(String user, String password) { + org.apache.activemq.ActiveMQConnectionFactory activeMQConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); + activeMQConnectionFactory.setUserName(user); + activeMQConnectionFactory.setPassword(password); + return activeMQConnectionFactory; + } + + private String sendAndReceiveText(ConnectionFactory connectionFactory, String clientId, String message, String topicName, ConsumerSupplier consumerSupplier) throws JMSException { + String messageRecieved; + try (Connection connection = connectionFactory.createConnection()) { + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Topic topic = session.createTopic(topicName); + MessageConsumer messageConsumer = consumerSupplier.create(topic, session); + messageConsumer.receive(1000); + + TextMessage messageToSend = session.createTextMessage(message); + session.createProducer(topic).send(messageToSend); + + TextMessage received = (TextMessage) messageConsumer.receive(1000); + messageRecieved = received != null ? received.getText() : null; + } + } + return messageRecieved; + } + + protected ActiveMQServer getActiveMQServer(String brokerConfig) throws Exception { + FileConfiguration fc = new FileConfiguration(); + FileJMSConfiguration fileConfiguration = new FileJMSConfiguration(); + FileDeploymentManager deploymentManager = new FileDeploymentManager(brokerConfig); + deploymentManager.addDeployable(fc); + deploymentManager.addDeployable(fileConfiguration); + deploymentManager.readConfiguration(); + + + SecurityConfiguration securityConfiguration = new SecurityConfiguration(); + securityConfiguration.addUser("a", "a"); + securityConfiguration.addRole("a", "a"); + + securityConfiguration.addUser("b", "b"); + securityConfiguration.addRole("b", "b"); + + + ActiveMQJAASSecurityManager sm = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), securityConfiguration); + + return addServer(new ActiveMQServerImpl(fc, sm)); + } + + private interface ConsumerSupplier { + MessageConsumer create(Topic topic, Session session) throws JMSException; + } + +} diff --git a/tests/integration-tests/src/test/resources/multicast_topic.xml b/tests/integration-tests/src/test/resources/multicast_topic.xml new file mode 100644 index 0000000000..cf5430e352 --- /dev/null +++ b/tests/integration-tests/src/test/resources/multicast_topic.xml @@ -0,0 +1,146 @@ + + + + + + + + + 0.0.0.0 + + 100 + + false + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + 2 + + -1 + + true + + + 40000 + + +
+ + + +
+ +
+ + + +
+ +
+ + + +
+
+ + + tcp://localhost:61616 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + false + DLQ + ExpiryQueue + 0 + 10Mb + 10 + BLOCK + + +
+