mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-03-04 08:19:56 +00:00
This closes #2093
This commit is contained in:
commit
88b23994c3
@ -31,6 +31,8 @@ public class PacketImpl implements Packet {
|
||||
|
||||
// 2.0.0
|
||||
public static final int ADDRESSING_CHANGE_VERSION = 129;
|
||||
public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
|
||||
|
||||
|
||||
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
||||
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
|
||||
|
@ -46,6 +46,7 @@ import javax.transaction.xa.XAResource;
|
||||
import java.io.Serializable;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -630,16 +631,20 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||
|
||||
queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName);
|
||||
|
||||
try {
|
||||
if (durability == ConsumerDurability.DURABLE) {
|
||||
createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
|
||||
} else {
|
||||
createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
|
||||
QueueQuery subResponse = session.queueQuery(queueName);
|
||||
|
||||
if (!(subResponse.isExists() && Objects.equals(subResponse.getAddress(), dest.getSimpleAddress()) && Objects.equals(subResponse.getFilterString(), coreFilterString))) {
|
||||
try {
|
||||
if (durability == ConsumerDurability.DURABLE) {
|
||||
createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
|
||||
} else {
|
||||
createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
|
||||
}
|
||||
} catch (ActiveMQQueueExistsException ignored) {
|
||||
// We ignore this because querying and then creating the queue wouldn't be idempotent
|
||||
// we could also add a parameter to ignore existence what would require a bigger work around to avoid
|
||||
// compatibility.
|
||||
}
|
||||
} catch (ActiveMQQueueExistsException ignored) {
|
||||
// We ignore this because querying and then creating the queue wouldn't be idempotent
|
||||
// we could also add a parameter to ignore existence what would require a bigger work around to avoid
|
||||
// compatibility.
|
||||
}
|
||||
|
||||
consumer = session.createConsumer(queueName, null, false);
|
||||
|
@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
@ -237,35 +238,51 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||
}
|
||||
|
||||
public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception {
|
||||
serverSession.createQueue(queueName, queueName, routingType, null, true, false);
|
||||
createTemporaryQueue(queueName, queueName, routingType, null);
|
||||
}
|
||||
|
||||
public void createTemporaryQueue(SimpleString address,
|
||||
SimpleString queueName,
|
||||
RoutingType routingType,
|
||||
SimpleString filter) throws Exception {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, true, false);
|
||||
try {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, true, false);
|
||||
} catch (ActiveMQSecurityException se) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void createUnsharedDurableQueue(SimpleString address,
|
||||
RoutingType routingType,
|
||||
SimpleString queueName,
|
||||
SimpleString filter) throws Exception {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false);
|
||||
try {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false);
|
||||
} catch (ActiveMQSecurityException se) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void createSharedDurableQueue(SimpleString address,
|
||||
RoutingType routingType,
|
||||
SimpleString queueName,
|
||||
SimpleString filter) throws Exception {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false);
|
||||
try {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false);
|
||||
} catch (ActiveMQSecurityException se) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void createSharedVolatileQueue(SimpleString address,
|
||||
RoutingType routingType,
|
||||
SimpleString queueName,
|
||||
SimpleString filter) throws Exception {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
|
||||
try {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
|
||||
} catch (ActiveMQSecurityException se) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
|
||||
|
@ -23,7 +23,6 @@ import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
@ -370,10 +369,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||
isVolatile = true;
|
||||
if (shared && sender.getName() != null) {
|
||||
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
|
||||
try {
|
||||
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
|
||||
if (!(result.isExists() && Objects.equals(result.getAddress(), addressToUse) && Objects.equals(result.getFilterString(), simpleStringSelector))) {
|
||||
sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
//this is ok, just means its shared
|
||||
}
|
||||
} else {
|
||||
queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
|
||||
|
@ -812,8 +812,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||
}
|
||||
} else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) {
|
||||
try {
|
||||
internalSession.createAddress(addressInfo, !dest.isTemporary());
|
||||
created = true;
|
||||
if (internalSession.getAddress(addressInfo.getName()) == null) {
|
||||
internalSession.createAddress(addressInfo, !dest.isTemporary());
|
||||
created = true;
|
||||
}
|
||||
} catch (ActiveMQAddressExistsException exists) {
|
||||
// The address may have been created by another thread in the mean time. Catch and do nothing.
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
@ -362,7 +363,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||
case CREATE_SHARED_QUEUE: {
|
||||
CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet;
|
||||
requiresResponse = request.isRequiresResponse();
|
||||
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString());
|
||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
|
||||
if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
|
||||
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString());
|
||||
}
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
}
|
||||
@ -371,7 +375,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||
case CREATE_SHARED_QUEUE_V2: {
|
||||
CreateSharedQueueMessage_V2 request = (CreateSharedQueueMessage_V2) packet;
|
||||
requiresResponse = request.isRequiresResponse();
|
||||
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue());
|
||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
|
||||
if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
|
||||
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue());
|
||||
}
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
}
|
||||
|
@ -0,0 +1,260 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.server;
|
||||
|
||||
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
||||
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.JMSSecurityException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class SecureConfigurationTest extends ActiveMQTestBase {
|
||||
|
||||
@Parameterized.Parameters(name = "{index}: protocol={0}")
|
||||
public static Collection<Object[]> parameters() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{"CORE"}, {"AMQP"}, {"OPENWIRE"}
|
||||
});
|
||||
}
|
||||
|
||||
/* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
|
||||
@Parameterized.Parameter(0)
|
||||
public String protocol;
|
||||
|
||||
@Test
|
||||
public void testSecureSharedDurableSubscriber() throws Exception {
|
||||
//This is because OpenWire does not support JMS 2.0
|
||||
Assume.assumeFalse(protocol.equals("OPENWIRE"));
|
||||
|
||||
ActiveMQServer server = getActiveMQServer("multicast_topic.xml");
|
||||
try {
|
||||
server.start();
|
||||
internal_testSecureSharedDurableSubscriber(getConnectionFactory("b", "b"));
|
||||
} finally {
|
||||
try {
|
||||
server.stop();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void internal_testSecureSharedDurableSubscriber(ConnectionFactory connectionFactory) throws JMSException {
|
||||
String message = "blah";
|
||||
|
||||
//Expect to be able to create subscriber on pre-defined/existing queue.
|
||||
String messageRecieved = sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared_durable", (t, s) -> s.createSharedDurableConsumer(t, "secured_topic_shared_durable/queue"));
|
||||
Assert.assertEquals(message, messageRecieved);
|
||||
|
||||
try {
|
||||
sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared_durable", (t, s) -> s.createSharedDurableConsumer(t, "secured_topic_shared_durable/non-existant-queue"));
|
||||
Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue");
|
||||
} catch (JMSSecurityException j) {
|
||||
//Expected exception
|
||||
}
|
||||
|
||||
try {
|
||||
sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared_durable", (t, s) -> s.createSharedDurableConsumer(t, "secured_topic_shared_durable/queue", "age < 10"));
|
||||
Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue");
|
||||
} catch (JMSSecurityException j) {
|
||||
//Expected exception
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecureSharedSubscriber() throws Exception {
|
||||
//This is because OpenWire does not support JMS 2.0
|
||||
Assume.assumeFalse(protocol.equals("OPENWIRE"));
|
||||
|
||||
ActiveMQServer server = getActiveMQServer("multicast_topic.xml");
|
||||
try {
|
||||
server.start();
|
||||
internal_testSecureSharedSubscriber(getConnectionFactory("b", "b"));
|
||||
} finally {
|
||||
try {
|
||||
server.stop();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void internal_testSecureSharedSubscriber(ConnectionFactory connectionFactory) throws JMSException {
|
||||
String message = "blah";
|
||||
|
||||
//Expect to be able to create subscriber on pre-defined/existing queue.
|
||||
String messageRecieved = sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared", (t, s) -> s.createSharedConsumer(t, "secured_topic_shared/queue"));
|
||||
Assert.assertEquals(message, messageRecieved);
|
||||
|
||||
try {
|
||||
sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared", (t, s) -> s.createSharedConsumer(t, "secured_topic_shared/non-existant-queue"));
|
||||
Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue");
|
||||
} catch (JMSSecurityException j) {
|
||||
//Expected exception
|
||||
}
|
||||
|
||||
try {
|
||||
sendAndReceiveText(connectionFactory, null, message, "secured_topic_shared", (t, s) -> s.createSharedConsumer(t, "secured_topic_shared/queue", "age < 10"));
|
||||
Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue");
|
||||
} catch (JMSSecurityException j) {
|
||||
//Expected exception
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecureDurableSubscriber() throws Exception {
|
||||
ActiveMQServer server = getActiveMQServer("multicast_topic.xml");
|
||||
try {
|
||||
server.start();
|
||||
internal_testSecureDurableSubscriber(getConnectionFactory("b", "b"));
|
||||
} finally {
|
||||
try {
|
||||
server.stop();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void internal_testSecureDurableSubscriber(ConnectionFactory connectionFactory) throws JMSException {
|
||||
String message = "blah";
|
||||
|
||||
//Expect to be able to create subscriber on pre-defined/existing queue.
|
||||
String messageRecieved = sendAndReceiveText(connectionFactory, "clientId", message, "secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, "secured_topic_durable/queue"));
|
||||
Assert.assertEquals(message, messageRecieved);
|
||||
|
||||
try {
|
||||
sendAndReceiveText(connectionFactory, "clientId", message, "secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, "secured_topic_durable/non-existant-queue"));
|
||||
Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue");
|
||||
} catch (JMSSecurityException j) {
|
||||
//Expected exception
|
||||
}
|
||||
|
||||
try {
|
||||
sendAndReceiveText(connectionFactory, "clientId", message, "secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, "secured_topic_durable/queue", "age < 10", false));
|
||||
Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue");
|
||||
} catch (JMSSecurityException j) {
|
||||
//Expected exception
|
||||
}
|
||||
|
||||
try {
|
||||
sendAndReceiveText(connectionFactory, "clientId", message, "secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, "secured_topic_durable/queue", "age < 10", true));
|
||||
Assert.fail("Security exception expected, but did not occur, excepetion expected as not permissioned to dynamically create queue");
|
||||
} catch (JMSSecurityException j) {
|
||||
//Expected exception
|
||||
}
|
||||
}
|
||||
|
||||
private ConnectionFactory getConnectionFactory(String user, String password) {
|
||||
switch (protocol) {
|
||||
case "CORE": return getActiveMQConnectionFactory(user, password);
|
||||
case "AMQP" : return getAMQPConnectionFactory(user, password);
|
||||
case "OPENWIRE": return getOpenWireConnectionFactory(user, password);
|
||||
default: throw new IllegalStateException("Unsupported Protocol");
|
||||
}
|
||||
}
|
||||
|
||||
private ActiveMQConnectionFactory getActiveMQConnectionFactory(String user, String password) {
|
||||
ActiveMQConnectionFactory activeMQConnection = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
activeMQConnection.setUser(user);
|
||||
activeMQConnection.setPassword(password);
|
||||
return activeMQConnection;
|
||||
}
|
||||
|
||||
private JmsConnectionFactory getAMQPConnectionFactory(String user, String password) {
|
||||
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
|
||||
jmsConnectionFactory.setUsername(user);
|
||||
jmsConnectionFactory.setPassword(password);
|
||||
return jmsConnectionFactory;
|
||||
}
|
||||
|
||||
private org.apache.activemq.ActiveMQConnectionFactory getOpenWireConnectionFactory(String user, String password) {
|
||||
org.apache.activemq.ActiveMQConnectionFactory activeMQConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
activeMQConnectionFactory.setUserName(user);
|
||||
activeMQConnectionFactory.setPassword(password);
|
||||
return activeMQConnectionFactory;
|
||||
}
|
||||
|
||||
private String sendAndReceiveText(ConnectionFactory connectionFactory, String clientId, String message, String topicName, ConsumerSupplier consumerSupplier) throws JMSException {
|
||||
String messageRecieved;
|
||||
try (Connection connection = connectionFactory.createConnection()) {
|
||||
if (clientId != null && !clientId.isEmpty()) {
|
||||
connection.setClientID(clientId);
|
||||
}
|
||||
connection.start();
|
||||
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
|
||||
Topic topic = session.createTopic(topicName);
|
||||
MessageConsumer messageConsumer = consumerSupplier.create(topic, session);
|
||||
messageConsumer.receive(1000);
|
||||
|
||||
TextMessage messageToSend = session.createTextMessage(message);
|
||||
session.createProducer(topic).send(messageToSend);
|
||||
|
||||
TextMessage received = (TextMessage) messageConsumer.receive(1000);
|
||||
messageRecieved = received != null ? received.getText() : null;
|
||||
}
|
||||
}
|
||||
return messageRecieved;
|
||||
}
|
||||
|
||||
protected ActiveMQServer getActiveMQServer(String brokerConfig) throws Exception {
|
||||
FileConfiguration fc = new FileConfiguration();
|
||||
FileJMSConfiguration fileConfiguration = new FileJMSConfiguration();
|
||||
FileDeploymentManager deploymentManager = new FileDeploymentManager(brokerConfig);
|
||||
deploymentManager.addDeployable(fc);
|
||||
deploymentManager.addDeployable(fileConfiguration);
|
||||
deploymentManager.readConfiguration();
|
||||
|
||||
|
||||
SecurityConfiguration securityConfiguration = new SecurityConfiguration();
|
||||
securityConfiguration.addUser("a", "a");
|
||||
securityConfiguration.addRole("a", "a");
|
||||
|
||||
securityConfiguration.addUser("b", "b");
|
||||
securityConfiguration.addRole("b", "b");
|
||||
|
||||
|
||||
ActiveMQJAASSecurityManager sm = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), securityConfiguration);
|
||||
|
||||
return addServer(new ActiveMQServerImpl(fc, sm));
|
||||
}
|
||||
|
||||
private interface ConsumerSupplier {
|
||||
MessageConsumer create(Topic topic, Session session) throws JMSException;
|
||||
}
|
||||
|
||||
}
|
146
tests/integration-tests/src/test/resources/multicast_topic.xml
Normal file
146
tests/integration-tests/src/test/resources/multicast_topic.xml
Normal file
@ -0,0 +1,146 @@
|
||||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
|
||||
<security-enabled>true</security-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<amqp-use-core-subscription-naming>true</amqp-use-core-subscription-naming>
|
||||
|
||||
<!--
|
||||
This value was determined through a calculation.
|
||||
Your system could perform 25 writes per millisecond
|
||||
on the current journal configuration.
|
||||
That translates as a sync write every 40000 nanoseconds
|
||||
-->
|
||||
<journal-buffer-timeout>40000</journal-buffer-timeout>
|
||||
|
||||
<addresses>
|
||||
<address name="secured_topic_shared_durable">
|
||||
<multicast>
|
||||
<queue name="secured_topic_shared_durable/queue" />
|
||||
</multicast>
|
||||
</address>
|
||||
|
||||
<address name="secured_topic_shared">
|
||||
<multicast>
|
||||
<queue name="nonDurable.secured_topic_shared/queue" purge-on-no-consumers="true" />
|
||||
</multicast>
|
||||
</address>
|
||||
|
||||
<address name="secured_topic_durable">
|
||||
<multicast>
|
||||
<queue name="clientId.secured_topic_durable/queue" />
|
||||
</multicast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="a,b"/>
|
||||
<permission type="deleteNonDurableQueue" roles="a,b"/>
|
||||
<permission type="createDurableQueue" roles="a,b"/>
|
||||
<permission type="deleteDurableQueue" roles="a,b"/>
|
||||
<permission type="browse" roles="a"/>
|
||||
<permission type="send" roles="a,b"/>
|
||||
<permission type="consume" roles="a,b" />
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="a"/>
|
||||
</security-setting>
|
||||
<security-setting match="secured_topic_shared_durable">
|
||||
<permission type="createNonDurableQueue" roles="a"/>
|
||||
<permission type="deleteNonDurableQueue" roles="a"/>
|
||||
<permission type="createDurableQueue" roles="a"/>
|
||||
<permission type="deleteDurableQueue" roles="a"/>
|
||||
<permission type="browse" roles="a"/>
|
||||
<permission type="send" roles="a,b"/>
|
||||
<permission type="consume" roles="a,b" />
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="a"/>
|
||||
</security-setting>
|
||||
<security-setting match="secured_topic_shared">
|
||||
<permission type="createNonDurableQueue" roles="a"/>
|
||||
<permission type="deleteNonDurableQueue" roles="a"/>
|
||||
<permission type="createDurableQueue" roles="a"/>
|
||||
<permission type="deleteDurableQueue" roles="a"/>
|
||||
<permission type="browse" roles="a"/>
|
||||
<permission type="send" roles="a,b"/>
|
||||
<permission type="consume" roles="a,b" />
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="a"/>
|
||||
</security-setting>
|
||||
<security-setting match="secured_topic_durable">
|
||||
<permission type="createNonDurableQueue" roles="a"/>
|
||||
<permission type="deleteNonDurableQueue" roles="a"/>
|
||||
<permission type="createDurableQueue" roles="a"/>
|
||||
<permission type="deleteDurableQueue" roles="a"/>
|
||||
<permission type="browse" roles="a"/>
|
||||
<permission type="send" roles="a,b"/>
|
||||
<permission type="consume" roles="a,b" />
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="a"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<auto-create-queues>false</auto-create-queues>
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<max-size-bytes>10Mb</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>BLOCK</address-full-policy>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
</core>
|
||||
</configuration>
|
Loading…
x
Reference in New Issue
Block a user