From 44b7e455cb93ca0151793ee24de40b0a7a58301b Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Tue, 13 Jun 2017 09:23:33 +0100 Subject: [PATCH] ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour Use AcitveMQDestination for subscription naming, fixing and aligning queue naming in the process. The change is behind a configuration toggle so to avoid causing any breaking changes for uses not expecting. --- .../config/ActiveMQDefaultConfiguration.java | 6 ++ .../jms/client/ActiveMQDestination.java | 6 +- .../artemis/jms/client/ActiveMQSession.java | 6 +- .../amqp/broker/ProtonProtocolManager.java | 3 +- .../client/AMQPClientConnectionFactory.java | 4 +- .../amqp/proton/AMQPConnectionContext.java | 8 ++ .../proton/ProtonServerSenderContext.java | 52 +++++++++--- .../protocol/openwire/OpenWireConnection.java | 2 +- .../protocol/openwire/amq/AMQConsumer.java | 2 +- .../ra/inflow/ActiveMQMessageHandler.java | 2 +- .../artemis/core/config/Configuration.java | 10 +++ .../core/config/impl/ConfigurationImpl.java | 14 +++ .../impl/FileConfigurationParser.java | 6 ++ .../schema/artemis-configuration.xsd | 10 +++ .../impl/DefaultsFileConfigurationTest.java | 2 + .../config/impl/FileConfigurationTest.java | 1 + .../ConfigurationTest-full-config.xml | 1 + .../amqp/ClientDefinedMultiConsumerTest.java | 32 +++---- .../amqp/JMSClientTestSupport.java | 52 ++++++++++++ .../amqp/JMSDurableConsumerTest.java | 22 +++++ .../amqp/JMSSharedConsumerTest.java | 81 ++++++------------ .../amqp/JMSSharedDurableConsumerTest.java | 85 +++++++------------ 22 files changed, 257 insertions(+), 150 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 6bd0ae8340..7ace35c944 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -442,6 +442,8 @@ public final class ActiveMQDefaultConfiguration { // Default period to wait between configuration file checks public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000; + public static final boolean DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING = false; + public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2; public static final int DEFAULT_MAX_DISK_USAGE = 100; @@ -1207,6 +1209,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD; } + public static boolean getDefaultAmqpUseCoreSubscriptionNaming() { + return DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING; + } + /** * The default global max size. -1 = no global max size. */ diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java index 297efe8c7c..0bf4dd6258 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java @@ -102,9 +102,9 @@ public class ActiveMQDestination implements Destination, Serializable, Reference } } - public static String createQueueNameForDurableSubscription(final boolean isDurable, - final String clientID, - final String subscriptionName) { + public static String createQueueNameForSubscription(final boolean isDurable, + final String clientID, + final String subscriptionName) { if (clientID != null) { if (isDurable) { return ActiveMQDestination.escape(clientID) + SEPARATOR + diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index a8aceec8ef..374a985e26 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); } - queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName)); + queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName)); if (durability == ConsumerDurability.DURABLE) { try { @@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); } - queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), subscriptionName)); + queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName)); QueueQuery subResponse = session.queueQuery(queueName); @@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new IllegalStateException("Cannot unsubscribe using a QueueSession"); } - SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), name)); + SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name)); try { QueueQuery response = session.queueQuery(queueName); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index b5325fb9ef..d36f18e2d2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -116,7 +116,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager connectionProperties; private final int ttl; + private final boolean useCoreSubscriptionNaming; public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, Map connectionProperties, int ttl) { this.server = server; this.containerId = containerId; this.connectionProperties = connectionProperties; this.ttl = ttl; + this.useCoreSubscriptionNaming = false; } public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional eventHandler) { @@ -52,7 +54,7 @@ public class AMQPClientConnectionFactory { Executor executor = server.getExecutorFactory().getExecutor(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool()); eventHandler.ifPresent(amqpConnection::addEventHandler); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 4a46a8a179..0ab417117c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -74,16 +74,20 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH private final ProtonProtocolManager protocolManager; + private final boolean useCoreSubscriptionNaming; + public AMQPConnectionContext(ProtonProtocolManager protocolManager, AMQPConnectionCallback connectionSP, String containerId, int idleTimeout, int maxFrameSize, int channelMax, + boolean useCoreSubscriptionNaming, ScheduledExecutorService scheduledPool) { this.protocolManager = protocolManager; this.connectionCallback = connectionSP; + this.useCoreSubscriptionNaming = useCoreSubscriptionNaming; this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis"); @@ -260,6 +264,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH } } + public boolean isUseCoreSubscriptionNaming() { + return useCoreSubscriptionNaming; + } + @Override public void onInit(Connection connection) throws Exception { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ad8bee79bd..8f8222b1a2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; @@ -68,6 +69,7 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Sender; import org.jboss.logging.Logger; @@ -188,7 +190,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // subscription queue String clientId = getClientId(); String pubId = sender.getName(); - queue = createQueueName(clientId, pubId, true, global, false); + global = hasRemoteDesiredCapability(sender, GLOBAL); + queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false); QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false); multicast = true; routingTypeToUse = RoutingType.MULTICAST; @@ -343,7 +346,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // id and link name String clientId = getClientId(); String pubId = sender.getName(); - queue = createQueueName(clientId, pubId, shared, global, false); + queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false); QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); if (result.isExists()) { @@ -369,7 +372,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // otherwise we are a volatile subscription isVolatile = true; if (shared && sender.getName() != null) { - queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile); + queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile); try { sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); } catch (ActiveMQQueueExistsException e) { @@ -493,7 +496,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (pubId.contains("|")) { pubId = pubId.split("\\|")[0]; } - String queue = createQueueName(clientId, pubId, shared, global, isVolatile); + String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile); result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); //only delete if it isn't volatile and has no consumers if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { @@ -733,20 +736,43 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return false; } - private static String createQueueName(String clientId, + private static boolean hasRemoteDesiredCapability(Link link, Symbol capability) { + Symbol[] remoteDesiredCapabilities = link.getRemoteDesiredCapabilities(); + if (remoteDesiredCapabilities != null) { + for (Symbol cap : remoteDesiredCapabilities) { + if (capability.equals(cap)) { + return true; + } + } + } + return false; + } + + private static String createQueueName(boolean useCoreSubscriptionNaming, + String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) { - String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId; - if (shared) { - if (queue.contains("|")) { - queue = queue.split("\\|")[0]; - } - if (isVolatile) { - queue = "nonDurable" + "." + queue; + if (useCoreSubscriptionNaming) { + final boolean durable = !isVolatile; + final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId; + final String clientID = clientId == null || clientId.isEmpty() || global ? null : clientId; + return ActiveMQDestination.createQueueNameForSubscription(durable, clientID, subscriptionName); + } else { + String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId; + if (shared) { + if (queue.contains("|")) { + queue = queue.split("\\|")[0]; + } + if (isVolatile) { + queue += ":shared-volatile"; + } + if (global) { + queue += ":global"; + } } + return queue; } - return queue; } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index c63d26611e..a56901e638 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1093,7 +1093,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { - SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); + SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); server.destroyQueue(subQueueName); return null; diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 3bdee8bda2..6bba4e13bc 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -150,7 +150,7 @@ public class AMQConsumer { addressInfo.addRoutingType(RoutingType.MULTICAST); } if (isDurable) { - queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName)); + queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName)); QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName); if (result.isExists()) { // Already exists diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index 4ff63571ed..bb2f87087a 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -112,7 +112,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList // Create the message consumer SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector); if (activation.isTopic() && spec.isSubscriptionDurable()) { - SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, spec.getClientID(), spec.getSubscriptionName())); + SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName())); QueueQuery subResponse = session.queueQuery(queueName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index aad3968b5e..9d5d4a8d4a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -289,6 +289,16 @@ public interface Configuration { */ Configuration setConnectionTTLOverride(long ttl); + /** + * Returns if to use Core subscription naming for AMQP. + */ + boolean isAmqpUseCoreSubscriptionNaming(); + + /** + * Sets if to use Core subscription naming for AMQP. + */ + Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming); + /** * Returns whether code coming from connection is executed asynchronously or not.
* Default value is diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 8edeb5b1a3..fb678b6546 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -262,6 +262,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private Long globalMaxSize; + private boolean amqpUseCoreSubscriptionNaming = ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming(); + private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage(); private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod(); @@ -453,6 +455,18 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + @Override + public boolean isAmqpUseCoreSubscriptionNaming() { + return amqpUseCoreSubscriptionNaming; + } + + @Override + public Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming) { + this.amqpUseCoreSubscriptionNaming = amqpUseCoreSubscriptionNaming; + return this; + } + + @Override public boolean isAsyncConnectionExecutionEnabled() { return asyncConnectionExecutionEnabled; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 524332552e..a3d59f6fc0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -219,6 +219,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String INTERNAL_NAMING_PREFIX = "internal-naming-prefix"; + private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming"; + + // Attributes ---------------------------------------------------- private boolean validateAIO = false; @@ -342,6 +345,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setInternalNamingPrefix(getString(e, INTERNAL_NAMING_PREFIX, config.getInternalNamingPrefix(), Validators.NO_CHECK)); + config.setAmqpUseCoreSubscriptionNaming(getBoolean(e, AMQP_USE_CORE_SUBSCRIPTION_NAMING, config.isAmqpUseCoreSubscriptionNaming())); + + // parsing cluster password String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index b0e8502f39..6af18bd233 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -52,6 +52,16 @@ + + + + This enables making AMQP subscription queue names, match core queue names, for better interoperability between protocols. + Note: Enabling this to an existing broker if pre-existing amqp durable subscriptions already existed will require + clients to re-subscribe and to clean up old subscription names. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java index 07d5f58415..a07c797ea8 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java @@ -133,6 +133,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(), conf.isGracefulShutdownEnabled()); Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout()); + + Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming(), conf.isAmqpUseCoreSubscriptionNaming()); } // Protected --------------------------------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 2f1836503c..28ad83e556 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -92,6 +92,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals("pagingdir", conf.getPagingDirectory()); Assert.assertEquals("somedir", conf.getBindingsDirectory()); Assert.assertEquals(false, conf.isCreateBindingsDir()); + Assert.assertEquals(true, conf.isAmqpUseCoreSubscriptionNaming()); Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO()); Assert.assertEquals("somedir2", conf.getJournalDirectory()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 0691e957e4..ca10eb99c8 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -122,6 +122,7 @@ false + true
address1
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java index 3f8da1af7c..51c70ee47c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); receiver2.close(); //check its been deleted Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; } }, 1000); connection.close(); @@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AddressInfo addressInfo = new AddressInfo(address); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); server.addAddressInfo(addressInfo); - server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"), null, true, false, -1, false, false); + server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect("myClientId")); @@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); receiver2.close(); //check its **Hasn't** been deleted - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); connection.close(); } @@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); //check its been deleted connection.close(); Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; } }, 1000); } @@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); receiver2.close(); //check its been deleted Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null; } }, 1000); connection.close(); @@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); receiver2.close(); //check its been deleted - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub"))); + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java index edd4968191..3f96711976 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -23,6 +23,7 @@ import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory; import org.jboss.logging.Logger; import org.junit.After; @@ -154,4 +155,55 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { return connection; } + + + protected String getBrokerCoreJMSConnectionString() { + + try { + int port = AMQP_PORT; + + String uri = null; + + if (isUseSSL()) { + uri = "tcp://127.0.0.1:" + port; + } else { + uri = "tcp://127.0.0.1:" + port; + } + + if (!getJmsConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getJmsConnectionURIOptions(); + } + + return uri; + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected Connection createCoreConnection() throws JMSException { + return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); + } + + private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { + ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); + + Connection connection = trackJMSConnection(factory.createConnection(username, password)); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + + if (start) { + connection.start(); + } + + return connection; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java index 26097f6d37..31de59dcc0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -31,11 +33,31 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class JMSDurableConsumerTest extends JMSClientTestSupport { + @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */ + @Parameterized.Parameter(0) + public boolean amqpUseCoreSubscriptionNaming; + + @Override + protected void addConfiguration(ActiveMQServer server) { + server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming); + } + @Test(timeout = 30000) public void testDurableConsumerAsync() throws Exception { final CountDownLatch latch = new CountDownLatch(1); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java index c49fcff978..4113e4eea3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp; import javax.jms.Connection; import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -27,11 +26,33 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; -import org.junit.Test; +import java.util.Arrays; +import java.util.Collection; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) public class JMSSharedConsumerTest extends JMSClientTestSupport { + @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */ + @Parameterized.Parameter(0) + public boolean amqpUseCoreSubscriptionNaming; + + @Override + protected void addConfiguration(ActiveMQServer server) { + server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming); + } + @Override protected String getConfiguredProtocols() { return "AMQP,OPENWIRE,CORE"; @@ -94,6 +115,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport { @Test(timeout = 30000) public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception { + org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming); Connection connection = createConnection(); //AMQP Connection connection2 = createCoreConnection(); //CORE @@ -104,6 +126,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport { @Test(timeout = 30000) public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception { + org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming); Connection connection = createCoreConnection(); //CORE Connection connection2 = createConnection(); //AMQP @@ -111,56 +134,4 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport { testSharedConsumer(connection, connection2); } - - - protected String getBrokerCoreJMSConnectionString() { - - try { - int port = AMQP_PORT; - - String uri = null; - - if (isUseSSL()) { - uri = "tcp://127.0.0.1:" + port; - } else { - uri = "tcp://127.0.0.1:" + port; - } - - if (!getJmsConnectionURIOptions().isEmpty()) { - uri = uri + "?" + getJmsConnectionURIOptions(); - } - - return uri; - } catch (Exception e) { - throw new RuntimeException(); - } - } - - protected Connection createCoreConnection() throws JMSException { - return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); - } - - private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { - ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); - - Connection connection = trackJMSConnection(factory.createConnection(username, password)); - - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - - if (clientId != null && !clientId.isEmpty()) { - connection.setClientID(clientId); - } - - if (start) { - connection.start(); - } - - return connection; - } - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java index 040506ba4b..ad0d9dd66e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp; import javax.jms.Connection; import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -27,11 +26,33 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; -import org.junit.Test; +import java.util.Arrays; +import java.util.Collection; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { + @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */ + @Parameterized.Parameter(0) + public boolean amqpUseCoreSubscriptionNaming; + + @Override + protected void addConfiguration(ActiveMQServer server) { + server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming); + } + @Override protected String getConfiguredProtocols() { return "AMQP,OPENWIRE,CORE"; @@ -68,6 +89,10 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { } assertNotNull("Should have received a message by now.", received); assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + + consumer1.close(); + consumer2.close(); + session1.unsubscribe("SharedConsumer"); } finally { connection1.close(); connection2.close(); @@ -94,6 +119,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { @Test(timeout = 30000) public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception { + org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming); Connection connection = createConnection(); //AMQP Connection connection2 = createCoreConnection(); //CORE @@ -104,6 +130,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { @Test(timeout = 30000) public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception { + org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming); Connection connection = createCoreConnection(); //CORE Connection connection2 = createConnection(); //AMQP @@ -111,56 +138,4 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { testSharedDurableConsumer(connection, connection2); } - - - protected String getBrokerCoreJMSConnectionString() { - - try { - int port = AMQP_PORT; - - String uri = null; - - if (isUseSSL()) { - uri = "tcp://127.0.0.1:" + port; - } else { - uri = "tcp://127.0.0.1:" + port; - } - - if (!getJmsConnectionURIOptions().isEmpty()) { - uri = uri + "?" + getJmsConnectionURIOptions(); - } - - return uri; - } catch (Exception e) { - throw new RuntimeException(); - } - } - - protected Connection createCoreConnection() throws JMSException { - return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); - } - - private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { - ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); - - Connection connection = trackJMSConnection(factory.createConnection(username, password)); - - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - - if (clientId != null && !clientId.isEmpty()) { - connection.setClientID(clientId); - } - - if (start) { - connection.start(); - } - - return connection; - } - }