ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour

Use AcitveMQDestination for subscription naming, fixing and aligning queue naming in the process.

The change is behind a configuration toggle so to avoid causing any breaking changes for uses not expecting.
This commit is contained in:
Michael Andre Pearce 2017-06-13 09:23:33 +01:00 committed by Martyn Taylor
parent 8dd6b712fc
commit 44b7e455cb
22 changed files with 257 additions and 150 deletions

View File

@ -442,6 +442,8 @@ public final class ActiveMQDefaultConfiguration {
// Default period to wait between configuration file checks // Default period to wait between configuration file checks
public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000; public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000;
public static final boolean DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING = false;
public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2; public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2;
public static final int DEFAULT_MAX_DISK_USAGE = 100; public static final int DEFAULT_MAX_DISK_USAGE = 100;
@ -1207,6 +1209,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD; return DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD;
} }
public static boolean getDefaultAmqpUseCoreSubscriptionNaming() {
return DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING;
}
/** /**
* The default global max size. -1 = no global max size. * The default global max size. -1 = no global max size.
*/ */

View File

@ -102,7 +102,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
} }
} }
public static String createQueueNameForDurableSubscription(final boolean isDurable, public static String createQueueNameForSubscription(final boolean isDurable,
final String clientID, final String clientID,
final String subscriptionName) { final String subscriptionName) {
if (clientID != null) { if (clientID != null) {

View File

@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
} }
queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName)); queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName));
if (durability == ConsumerDurability.DURABLE) { if (durability == ConsumerDurability.DURABLE) {
try { try {
@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
} }
queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), subscriptionName)); queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName));
QueueQuery subResponse = session.queueQuery(queueName); QueueQuery subResponse = session.queueQuery(queueName);
@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new IllegalStateException("Cannot unsubscribe using a QueueSession"); throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
} }
SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), name)); SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name));
try { try {
QueueQuery response = session.queueQuery(queueName); QueueQuery response = session.queueQuery(queueName);

View File

@ -116,7 +116,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
} }
String id = server.getConfiguration().getName(); String id = server.getConfiguration().getName();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool()); boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();

View File

@ -39,12 +39,14 @@ public class AMQPClientConnectionFactory {
private final String containerId; private final String containerId;
private final Map<Symbol, Object> connectionProperties; private final Map<Symbol, Object> connectionProperties;
private final int ttl; private final int ttl;
private final boolean useCoreSubscriptionNaming;
public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, Map<Symbol, Object> connectionProperties, int ttl) { public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, Map<Symbol, Object> connectionProperties, int ttl) {
this.server = server; this.server = server;
this.containerId = containerId; this.containerId = containerId;
this.connectionProperties = connectionProperties; this.connectionProperties = connectionProperties;
this.ttl = ttl; this.ttl = ttl;
this.useCoreSubscriptionNaming = false;
} }
public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) { public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) {
@ -52,7 +54,7 @@ public class AMQPClientConnectionFactory {
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool()); AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
eventHandler.ifPresent(amqpConnection::addEventHandler); eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);

View File

@ -74,16 +74,20 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
private final ProtonProtocolManager protocolManager; private final ProtonProtocolManager protocolManager;
private final boolean useCoreSubscriptionNaming;
public AMQPConnectionContext(ProtonProtocolManager protocolManager, public AMQPConnectionContext(ProtonProtocolManager protocolManager,
AMQPConnectionCallback connectionSP, AMQPConnectionCallback connectionSP,
String containerId, String containerId,
int idleTimeout, int idleTimeout,
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,
boolean useCoreSubscriptionNaming,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
this.protocolManager = protocolManager; this.protocolManager = protocolManager;
this.connectionCallback = connectionSP; this.connectionCallback = connectionSP;
this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis"); connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
@ -260,6 +264,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
} }
} }
public boolean isUseCoreSubscriptionNaming() {
return useCoreSubscriptionNaming;
}
@Override @Override
public void onInit(Connection connection) throws Exception { public void onInit(Connection connection) throws Exception {

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
@ -68,6 +69,7 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -188,7 +190,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// subscription queue // subscription queue
String clientId = getClientId(); String clientId = getClientId();
String pubId = sender.getName(); String pubId = sender.getName();
queue = createQueueName(clientId, pubId, true, global, false); global = hasRemoteDesiredCapability(sender, GLOBAL);
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false); QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
multicast = true; multicast = true;
routingTypeToUse = RoutingType.MULTICAST; routingTypeToUse = RoutingType.MULTICAST;
@ -343,7 +346,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// id and link name // id and link name
String clientId = getClientId(); String clientId = getClientId();
String pubId = sender.getName(); String pubId = sender.getName();
queue = createQueueName(clientId, pubId, shared, global, false); queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
if (result.isExists()) { if (result.isExists()) {
@ -369,7 +372,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// otherwise we are a volatile subscription // otherwise we are a volatile subscription
isVolatile = true; isVolatile = true;
if (shared && sender.getName() != null) { if (shared && sender.getName() != null) {
queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile); queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
try { try {
sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
@ -493,7 +496,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (pubId.contains("|")) { if (pubId.contains("|")) {
pubId = pubId.split("\\|")[0]; pubId = pubId.split("\\|")[0];
} }
String queue = createQueueName(clientId, pubId, shared, global, isVolatile); String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
//only delete if it isn't volatile and has no consumers //only delete if it isn't volatile and has no consumers
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
@ -733,20 +736,43 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return false; return false;
} }
private static String createQueueName(String clientId, private static boolean hasRemoteDesiredCapability(Link link, Symbol capability) {
Symbol[] remoteDesiredCapabilities = link.getRemoteDesiredCapabilities();
if (remoteDesiredCapabilities != null) {
for (Symbol cap : remoteDesiredCapabilities) {
if (capability.equals(cap)) {
return true;
}
}
}
return false;
}
private static String createQueueName(boolean useCoreSubscriptionNaming,
String clientId,
String pubId, String pubId,
boolean shared, boolean shared,
boolean global, boolean global,
boolean isVolatile) { boolean isVolatile) {
if (useCoreSubscriptionNaming) {
final boolean durable = !isVolatile;
final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId;
final String clientID = clientId == null || clientId.isEmpty() || global ? null : clientId;
return ActiveMQDestination.createQueueNameForSubscription(durable, clientID, subscriptionName);
} else {
String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId; String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
if (shared) { if (shared) {
if (queue.contains("|")) { if (queue.contains("|")) {
queue = queue.split("\\|")[0]; queue = queue.split("\\|")[0];
} }
if (isVolatile) { if (isVolatile) {
queue = "nonDurable" + "." + queue; queue += ":shared-volatile";
}
if (global) {
queue += ":global";
} }
} }
return queue; return queue;
} }
} }
}

View File

@ -1093,7 +1093,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override @Override
public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
server.destroyQueue(subQueueName); server.destroyQueue(subQueueName);
return null; return null;

View File

@ -150,7 +150,7 @@ public class AMQConsumer {
addressInfo.addRoutingType(RoutingType.MULTICAST); addressInfo.addRoutingType(RoutingType.MULTICAST);
} }
if (isDurable) { if (isDurable) {
queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName)); queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName); QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isExists()) { if (result.isExists()) {
// Already exists // Already exists

View File

@ -112,7 +112,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
// Create the message consumer // Create the message consumer
SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector); SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
if (activation.isTopic() && spec.isSubscriptionDurable()) { if (activation.isTopic() && spec.isSubscriptionDurable()) {
SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, spec.getClientID(), spec.getSubscriptionName())); SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
QueueQuery subResponse = session.queueQuery(queueName); QueueQuery subResponse = session.queueQuery(queueName);

View File

@ -289,6 +289,16 @@ public interface Configuration {
*/ */
Configuration setConnectionTTLOverride(long ttl); Configuration setConnectionTTLOverride(long ttl);
/**
* Returns if to use Core subscription naming for AMQP.
*/
boolean isAmqpUseCoreSubscriptionNaming();
/**
* Sets if to use Core subscription naming for AMQP.
*/
Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming);
/** /**
* Returns whether code coming from connection is executed asynchronously or not. <br> * Returns whether code coming from connection is executed asynchronously or not. <br>
* Default value is * Default value is

View File

@ -262,6 +262,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private Long globalMaxSize; private Long globalMaxSize;
private boolean amqpUseCoreSubscriptionNaming = ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming();
private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage(); private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod(); private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod();
@ -453,6 +455,18 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this; return this;
} }
@Override
public boolean isAmqpUseCoreSubscriptionNaming() {
return amqpUseCoreSubscriptionNaming;
}
@Override
public Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming) {
this.amqpUseCoreSubscriptionNaming = amqpUseCoreSubscriptionNaming;
return this;
}
@Override @Override
public boolean isAsyncConnectionExecutionEnabled() { public boolean isAsyncConnectionExecutionEnabled() {
return asyncConnectionExecutionEnabled; return asyncConnectionExecutionEnabled;

View File

@ -219,6 +219,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String INTERNAL_NAMING_PREFIX = "internal-naming-prefix"; private static final String INTERNAL_NAMING_PREFIX = "internal-naming-prefix";
private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming";
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
private boolean validateAIO = false; private boolean validateAIO = false;
@ -342,6 +345,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setInternalNamingPrefix(getString(e, INTERNAL_NAMING_PREFIX, config.getInternalNamingPrefix(), Validators.NO_CHECK)); config.setInternalNamingPrefix(getString(e, INTERNAL_NAMING_PREFIX, config.getInternalNamingPrefix(), Validators.NO_CHECK));
config.setAmqpUseCoreSubscriptionNaming(getBoolean(e, AMQP_USE_CORE_SUBSCRIPTION_NAMING, config.isAmqpUseCoreSubscriptionNaming()));
// parsing cluster password // parsing cluster password
String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK); String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK);

View File

@ -52,6 +52,16 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="amqp-use-core-subscription-naming" type="xsd:boolean" maxOccurs="1" minOccurs="0" default="false">
<xsd:annotation>
<xsd:documentation>
This enables making AMQP subscription queue names, match core queue names, for better interoperability between protocols.
Note: Enabling this to an existing broker if pre-existing amqp durable subscriptions already existed will require
clients to re-subscribe and to clean up old subscription names.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="resolve-protocols" type="xsd:boolean" default="true" maxOccurs="1" <xsd:element name="resolve-protocols" type="xsd:boolean" default="true" maxOccurs="1"
minOccurs="0"> minOccurs="0">
<xsd:annotation> <xsd:annotation>

View File

@ -133,6 +133,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(), conf.isGracefulShutdownEnabled()); Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(), conf.isGracefulShutdownEnabled());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout()); Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming(), conf.isAmqpUseCoreSubscriptionNaming());
} }
// Protected --------------------------------------------------------------------------------------------- // Protected ---------------------------------------------------------------------------------------------

View File

@ -92,6 +92,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals("pagingdir", conf.getPagingDirectory()); Assert.assertEquals("pagingdir", conf.getPagingDirectory());
Assert.assertEquals("somedir", conf.getBindingsDirectory()); Assert.assertEquals("somedir", conf.getBindingsDirectory());
Assert.assertEquals(false, conf.isCreateBindingsDir()); Assert.assertEquals(false, conf.isCreateBindingsDir());
Assert.assertEquals(true, conf.isAmqpUseCoreSubscriptionNaming());
Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO()); Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO());
Assert.assertEquals("somedir2", conf.getJournalDirectory()); Assert.assertEquals("somedir2", conf.getJournalDirectory());

View File

@ -122,6 +122,7 @@
<exclusive>false</exclusive> <exclusive>false</exclusive>
</divert> </divert>
</diverts> </diverts>
<amqp-use-core-subscription-naming>true</amqp-use-core-subscription-naming>
<queues> <queues>
<queue name="queue1"> <queue name="queue1">
<address>address1</address> <address>address1</address>

View File

@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
receiver.close(); receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close(); receiver2.close();
//check its been deleted //check its been deleted
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisfied() throws Exception { public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null; return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
} }
}, 1000); }, 1000);
connection.close(); connection.close();
@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"), null, true, false, -1, false, false); server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId")); AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
receiver.close(); receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close(); receiver2.close();
//check its **Hasn't** been deleted //check its **Hasn't** been deleted
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
connection.close(); connection.close();
} }
@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
//check its been deleted //check its been deleted
connection.close(); connection.close();
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisfied() throws Exception { public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null; return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
} }
}, 1000); }, 1000);
} }
@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
receiver.close(); receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
receiver2.close(); receiver2.close();
//check its been deleted //check its been deleted
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisfied() throws Exception { public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")) == null; return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null;
} }
}, 1000); }, 1000);
connection.close(); connection.close();
@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount());
receiver.close(); receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
receiver2.close(); receiver2.close();
//check its been deleted //check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub"))); assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
connection.close(); connection.close();
} }

View File

@ -23,6 +23,7 @@ import javax.jms.Connection;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.After; import org.junit.After;
@ -154,4 +155,55 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return connection; return connection;
} }
protected String getBrokerCoreJMSConnectionString() {
try {
int port = AMQP_PORT;
String uri = null;
if (isUseSSL()) {
uri = "tcp://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createCoreConnection() throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -31,11 +33,31 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSDurableConsumerTest extends JMSClientTestSupport { public class JMSDurableConsumerTest extends JMSClientTestSupport {
@Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
/* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
@Parameterized.Parameter(0)
public boolean amqpUseCoreSubscriptionNaming;
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
}
@Test(timeout = 30000) @Test(timeout = 30000)
public void testDurableConsumerAsync() throws Exception { public void testDurableConsumerAsync() throws Exception {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
@ -27,11 +26,33 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; import java.util.Arrays;
import org.junit.Test; import java.util.Collection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSSharedConsumerTest extends JMSClientTestSupport { public class JMSSharedConsumerTest extends JMSClientTestSupport {
@Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
/* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
@Parameterized.Parameter(0)
public boolean amqpUseCoreSubscriptionNaming;
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
}
@Override @Override
protected String getConfiguredProtocols() { protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE"; return "AMQP,OPENWIRE,CORE";
@ -94,6 +115,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception { public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception {
org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
Connection connection = createConnection(); //AMQP Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE Connection connection2 = createCoreConnection(); //CORE
@ -104,6 +126,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception { public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception {
org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
Connection connection = createCoreConnection(); //CORE Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP Connection connection2 = createConnection(); //AMQP
@ -111,56 +134,4 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
testSharedConsumer(connection, connection2); testSharedConsumer(connection, connection2);
} }
protected String getBrokerCoreJMSConnectionString() {
try {
int port = AMQP_PORT;
String uri = null;
if (isUseSSL()) {
uri = "tcp://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createCoreConnection() throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
} }

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
@ -27,11 +26,33 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; import java.util.Arrays;
import org.junit.Test; import java.util.Collection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
@Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
/* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
@Parameterized.Parameter(0)
public boolean amqpUseCoreSubscriptionNaming;
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
}
@Override @Override
protected String getConfiguredProtocols() { protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE"; return "AMQP,OPENWIRE,CORE";
@ -68,6 +89,10 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
} }
assertNotNull("Should have received a message by now.", received); assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
consumer1.close();
consumer2.close();
session1.unsubscribe("SharedConsumer");
} finally { } finally {
connection1.close(); connection1.close();
connection2.close(); connection2.close();
@ -94,6 +119,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception { public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception {
org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
Connection connection = createConnection(); //AMQP Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE Connection connection2 = createCoreConnection(); //CORE
@ -104,6 +130,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception { public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception {
org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
Connection connection = createCoreConnection(); //CORE Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP Connection connection2 = createConnection(); //AMQP
@ -111,56 +138,4 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
testSharedDurableConsumer(connection, connection2); testSharedDurableConsumer(connection, connection2);
} }
protected String getBrokerCoreJMSConnectionString() {
try {
int port = AMQP_PORT;
String uri = null;
if (isUseSSL()) {
uri = "tcp://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createCoreConnection() throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
} }