ARTEMIS-4127 refactor multiprotocol JMS tests in AMQP package

Over time org.apache.activemq.artemis.tests.integration.amqp has become
home to many multi-protocol JMS tests even though the package is really
for AMQP-specific tests. This commit splits those tests out into their
own package.

This is a preliminary step to clarify these tests before I add another
one for a different issue.
This commit is contained in:
Justin Bertram 2023-01-12 00:08:23 -06:00 committed by clebertsuconic
parent abd62665ce
commit c190d1c72f
12 changed files with 695 additions and 250 deletions

View File

@ -16,16 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -39,144 +29,31 @@ import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
public class JMSMessageConsumerTest extends JMSClientTestSupport {
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Test(timeout = 30000)
public void testDeliveryModeAMQPProducerCoreConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeAMQPProducerAMQPConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeCoreProducerAMQPConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeCoreProducerCoreConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testDeliveryMode(connection, connection2);
}
private void testDeliveryMode(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode());
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 30000)
public void testPriorityAMQPProducerCoreConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityAMQPProducerAMQPConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityModeCoreProducerAMQPConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityCoreProducerCoreConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testPriority(connection, connection2);
}
private void testPriority(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setPriority(2);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(2, received.getJMSPriority());
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 60000)
public void testSelectorOnTopic() throws Exception {
doTestSelector(true);
@ -221,47 +98,6 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithCore() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> createCoreConnection(false));
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithOpenWire() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> createOpenWireConnection(false));
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithAMQP() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> JMSMessageConsumerTest.super.createConnection(false));
}
private void testDurableSubscriptionWithConfigurationManagedQueue(ConnectionSupplier connectionSupplier) throws Exception {
final String clientId = "bar";
final String subName = "foo";
final String queueName = DestinationUtil.createQueueNameForSubscription(true, clientId, subName).toString();
server.stop();
server.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setAddress("myTopic").setFilterString("color = 'BLUE'").setRoutingType(RoutingType.MULTICAST));
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(true);
server.start();
try (Connection connection = connectionSupplier.createConnection()) {
connection.setClientID(clientId);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("myTopic");
MessageConsumer messageConsumer = session.createDurableSubscriber(destination, subName);
messageConsumer.close();
Queue queue = server.locateQueue(queueName);
assertNotNull(queue);
assertNotNull(queue.getFilter());
assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString());
}
}
@Test(timeout = 30000)
public void testSelectorsWithJMSTypeOnTopic() throws Exception {
doTestSelectorsWithJMSType(true);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
@ -34,22 +34,13 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Test;
public class JMSLVQTest extends JMSClientTestSupport {
public class JMSLVQTest extends MultiprotocolJMSClientTestSupport {
private static final String NORMAL_QUEUE_NAME = "NORMAL";
private static final String LVQ_QUEUE_NAME = "LVQ";
private static final String LVQ_CUSTOM_KEY_QUEUE_NAME = "LVQ_CUSTOM_KEY_QUEUE";
private static final String CUSTOM_KEY = "KEY";
private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistenceEnabled(false);

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JMSMessageConsumerTest extends MultiprotocolJMSClientTestSupport {
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test(timeout = 30000)
public void testDeliveryModeAMQPProducerCoreConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeAMQPProducerAMQPConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeCoreProducerAMQPConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeCoreProducerCoreConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testDeliveryMode(connection, connection2);
}
private void testDeliveryMode(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode());
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 30000)
public void testPriorityAMQPProducerCoreConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityAMQPProducerAMQPConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityModeCoreProducerAMQPConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityCoreProducerCoreConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testPriority(connection, connection2);
}
private void testPriority(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setPriority(2);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(2, received.getJMSPriority());
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithCore() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> createCoreConnection(false));
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithOpenWire() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> createOpenWireConnection(false));
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithAMQP() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> JMSMessageConsumerTest.super.createConnection(false));
}
private void testDurableSubscriptionWithConfigurationManagedQueue(ConnectionSupplier connectionSupplier) throws Exception {
final String clientId = "bar";
final String subName = "foo";
final String queueName = DestinationUtil.createQueueNameForSubscription(true, clientId, subName).toString();
server.stop();
server.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setAddress("myTopic").setFilterString("color = 'BLUE'").setRoutingType(RoutingType.MULTICAST));
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(true);
server.start();
try (Connection connection = connectionSupplier.createConnection()) {
connection.setClientID(clientId);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("myTopic");
MessageConsumer messageConsumer = session.createDurableSubscriber(destination, subName);
messageConsumer.close();
Queue queue = server.locateQueue(queueName);
assertNotNull(queue);
assertNotNull(queue.getFilter());
assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString());
}
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
public class JMSMessageGroupsTest extends JMSClientTestSupport {
public class JMSMessageGroupsTest extends MultiprotocolJMSClientTestSupport {
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -49,15 +49,6 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
private static final int RECEIVE_TIMEOUT = 1000;
private static final String JMSX_GROUP_ID = "JmsGroupsTest";
private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
super.configureAddressPolicy(server);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
@ -47,19 +47,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test that various message types are handled as expected with an AMQP JMS client.
* Test that various message types are handled as expected between JMS clients.
*/
public class JMSMessageTypesTest extends JMSClientTestSupport {
public class JMSMessageTypesTest extends MultiprotocolJMSClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final int NUM_MESSAGES = 10;
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Test(timeout = 60000)
public void testAddressControlSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@RunWith(Parameterized.class)
public class JMSNonDestructiveTest extends JMSClientTestSupport {
public class JMSNonDestructiveTest extends MultiprotocolJMSClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -71,9 +71,6 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
private static final String NON_DESTRUCTIVE_LVQ_QUEUE_NAME = "NON_DESTRUCTIVE_LVQ_QUEUE";
private static final String NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME = "NON_DESTRUCTIVE_LVQ_TOMBSTONE_QUEUE";
private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
protected final boolean persistenceEnabled;
protected final long scanPeriod;
@ -88,11 +85,6 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
return Arrays.asList(params);
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
@ -102,6 +94,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true));
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_TOMBSTONE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true));
}
@Override
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
super.createAddressAndQueues(server);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.JMSException;
@ -33,19 +33,10 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Test;
public class JMSSelectorTest extends JMSClientTestSupport {
public class JMSSelectorTest extends MultiprotocolJMSClientTestSupport {
private static final String NORMAL_QUEUE_NAME = "NORMAL";
private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistenceEnabled(false);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -36,7 +36,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSSharedConsumerTest extends JMSClientTestSupport {
public class JMSSharedConsumerTest extends MultiprotocolJMSClientTestSupport {
@Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
public static Collection<Object[]> parameters() {
@ -54,11 +54,6 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
private void testSharedConsumer(Connection connection1, Connection connection2) throws Exception {
testSharedConsumer(connection1, connection2, false);
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -38,7 +38,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
public class JMSSharedDurableConsumerTest extends MultiprotocolJMSClientTestSupport {
@Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
public static Collection<Object[]> parameters() {
@ -56,11 +56,6 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
private void testSharedDurableConsumer(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.JMSException;
@ -34,19 +34,10 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Test;
public class JMSXPathSelectorTest extends JMSClientTestSupport {
public class JMSXPathSelectorTest extends MultiprotocolJMSClientTestSupport {
private static final String NORMAL_QUEUE_NAME = "NORMAL";
private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Override
protected URI getBrokerQpidJMSConnectionURI() {
try {

View File

@ -0,0 +1,443 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class MultiprotocolJMSClientTestSupport extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected static LinkedList<Connection> jmsConnections = new LinkedList<>();
protected static final int PORT = 5672;
protected static final String BROKER_NAME = "localhost";
protected static final String NETTY_ACCEPTOR = "netty-acceptor";
protected String noprivUser = "noprivs";
protected String noprivPass = "noprivs";
protected String browseUser = "browser";
protected String browsePass = "browser";
protected String guestUser = "guest";
protected String guestPass = "guest";
protected String fullUser = "user";
protected String fullPass = "pass";
protected ActiveMQServer server;
protected MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
protected ConnectionSupplier AMQPConnection = () -> createConnection();
protected ConnectionSupplier CoreConnection = () -> createCoreConnection();
protected ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// Bug in Qpid JMS not shutting down a connection thread on certain errors
// TODO - Reevaluate after Qpid JMS 0.23.0 is released.
disableCheckThread();
server = createServer();
}
@After
@Override
public void tearDown() throws Exception {
try {
for (Connection connection : jmsConnections) {
try {
connection.close();
} catch (Throwable ignored) {
ignored.printStackTrace();
}
}
} catch (Exception e) {
logger.warn("Exception during tearDown", e);
}
jmsConnections.clear();
try {
if (server != null) {
server.stop();
}
} finally {
super.tearDown();
}
}
protected boolean isAutoCreateQueues() {
return true;
}
protected boolean isAutoCreateAddresses() {
return true;
}
protected boolean isSecurityEnabled() {
return false;
}
protected String getDeadLetterAddress() {
return "ActiveMQ.DLQ";
}
protected ActiveMQServer createServer() throws Exception {
return createServer(PORT);
}
protected ActiveMQServer createServer(int port) throws Exception {
final ActiveMQServer server = this.createServer(true, true);
server.getConfiguration().getAcceptorConfigurations().clear();
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server, port));
server.getConfiguration().setName(BROKER_NAME);
server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port);
server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port);
server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port);
if (port == PORT) {
// we use the default large directory if the default port
// as some tests will assert number of files
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory());
} else {
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory() + port);
}
server.getConfiguration().setJMXManagementEnabled(true);
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.setMBeanServer(mBeanServer);
// Add any additional Acceptors needed for tests
addAdditionalAcceptors(server);
// Address configuration
configureAddressPolicy(server);
// Add optional security for tests that need it
configureBrokerSecurity(server);
// Add extra configuration
addConfiguration(server);
server.start();
// Prepare all addresses and queues for client tests.
createAddressAndQueues(server);
return server;
}
protected void addConfiguration(ActiveMQServer server) {
}
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols());
HashMap<String, Object> amqpParams = new HashMap<>();
TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, amqpParams);
return tc;
}
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
protected void configureAddressPolicy(ActiveMQServer server) {
// Address configuration
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setAutoCreateQueues(isAutoCreateQueues());
addressSettings.setAutoCreateAddresses(isAutoCreateAddresses());
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
server.getConfiguration().getAddressSettings().put("#", addressSettings);
Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
if (tc.getName().equals(NETTY_ACCEPTOR)) {
tc.getExtraParams().put("anycastPrefix", "anycast://");
tc.getExtraParams().put("multicastPrefix", "multicast://");
}
}
}
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
// None by default
}
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
// None by default
}
protected void configureBrokerSecurity(ActiveMQServer server) {
if (isSecurityEnabled()) {
enableSecurity(server);
} else {
server.getConfiguration().setSecurityEnabled(false);
}
}
protected void enableSecurity(ActiveMQServer server, String... securityMatches) {
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
// User additions
securityManager.getConfiguration().addUser(noprivUser, noprivPass);
securityManager.getConfiguration().addRole(noprivUser, "nothing");
securityManager.getConfiguration().addUser(browseUser, browsePass);
securityManager.getConfiguration().addRole(browseUser, "browser");
securityManager.getConfiguration().addUser(guestUser, guestPass);
securityManager.getConfiguration().addRole(guestUser, "guest");
securityManager.getConfiguration().addUser(fullUser, fullPass);
securityManager.getConfiguration().addRole(fullUser, "full");
// Configure roles
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HashSet<Role> value = new HashSet<>();
value.add(new Role("nothing", false, false, false, false, false, false, false, false, false, false));
value.add(new Role("browser", false, false, false, false, false, false, false, true, false, false));
value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false));
value.add(new Role("full", true, true, true, true, true, true, true, true, true, true));
securityRepository.addMatch("#", value);
for (String match : securityMatches) {
securityRepository.addMatch(match, value);
}
server.getConfiguration().setSecurityEnabled(true);
}
public String getTopicName() {
return getName() + "-Topic";
}
public String getQueueName() {
return getName();
}
public Queue getProxyToQueue(String queueName) {
return server.locateQueue(SimpleString.toSimpleString(queueName));
}
private Connection trackJMSConnection(Connection connection) {
jmsConnections.add(connection);
return connection;
}
protected String getJmsConnectionURIOptions() {
return "";
}
protected String getBrokerQpidJMSConnectionString() {
try {
String uri = "amqp://127.0.0.1:" + PORT;
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected URI getBrokerQpidJMSConnectionURI() {
try {
return new URI(getBrokerQpidJMSConnectionString());
} catch (Exception e) {
throw new RuntimeException();
}
}
protected URI getBrokerQpidJMSFailoverConnectionURI() {
try {
return new URI("failover:(" + getBrokerQpidJMSConnectionString() + ")");
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createConnection() throws JMSException {
return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, true);
}
protected Connection createFailoverConnection() throws JMSException {
return createConnection(getBrokerQpidJMSFailoverConnectionURI(), null, null, null, true);
}
protected Connection createConnection(boolean start) throws JMSException {
return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, start);
}
protected Connection createConnection(String clientId) throws JMSException {
return createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientId, true);
}
protected Connection createConnection(URI remoteURI,
String username,
String password,
String clientId,
boolean start) throws JMSException {
JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(exception -> exception.printStackTrace());
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
protected String getBrokerCoreJMSConnectionString() {
try {
String uri = "tcp://127.0.0.1:" + PORT;
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createCoreConnection() throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
protected Connection createCoreConnection(boolean start) throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, start);
}
private Connection createCoreConnection(String connectionString,
String username,
String password,
String clientId,
boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(exception -> exception.printStackTrace());
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
protected String getBrokerOpenWireJMSConnectionString() {
try {
String uri = "tcp://127.0.0.1:" + PORT;
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
} else {
uri = uri + "?wireFormat.cacheEnabled=true";
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createOpenWireConnection() throws JMSException {
return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true);
}
protected Connection createOpenWireConnection(boolean start) throws JMSException {
return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, start);
}
private Connection createOpenWireConnection(String connectionString,
String username,
String password,
String clientId,
boolean start) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(exception -> exception.printStackTrace());
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
interface ConnectionSupplier {
Connection createConnection() throws JMSException;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package contains tests that use multiple different protocols via
* JMS.
*
* Current tests exercise AMQP, Core, & OpenWire protocols.
*
* The tests either need to execute the same logic across all protocols to
* ensure consistent results or they need to interoperate to verify
* compatibility.
*/
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;