diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index 40eb175e6e..1b1699f5e0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; +import org.apache.activemq.artemis.jms.client.ActiveMQTopic; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -54,6 +55,12 @@ public class ProtonProtocolManager implements ProtocolManager, Noti private final ProtonProtocolManagerFactory factory; + /* + * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for + * the address. This can be changed on the acceptor. + * */ + private String pubSubPrefix = ActiveMQTopic.JMS_TOPIC_ADDRESS_PREFIX; + public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -139,4 +146,13 @@ public class ProtonProtocolManager implements ProtocolManager, Noti public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { } + public String getPubSubPrefix() { + return pubSubPrefix; + } + + public void setPubSubPrefix(String pubSubPrefix) { + this.pubSubPrefix = pubSubPrefix; + } + + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 2350f9d4e8..5b73acfd18 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -184,6 +184,16 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false); } + @Override + public void createTemporaryQueue(String address, String queueName) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true); + } + + @Override + public void createDurableQueue(String address, String queueName) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true); + } + @Override public boolean queueQuery(String queueName) throws Exception { boolean queryResult = false; @@ -360,6 +370,16 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } } + @Override + public String getPubSubPrefix() { + return manager.getPubSubPrefix(); + } + + @Override + public void deleteQueue(String address) throws Exception { + manager.getServer().destroyQueue(new SimpleString(address)); + } + private void resetContext() { manager.getServer().getStorageManager().setContext(null); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java index 786d0d7fac..1abd96f281 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java @@ -31,4 +31,6 @@ public interface AMQPClientConnectionContext extends AMQPConnectionContext { void clientOpen(ClientSASL sasl) throws Exception; AMQPClientSessionContext createClientSession() throws ActiveMQAMQPException; + + void setContainer(String containerID); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java index 6cd0aa74db..b5184749e7 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java @@ -23,4 +23,6 @@ public interface AMQPClientSessionContext extends AMQPSessionContext { AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException; AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException; + + AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException; } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java index 0c0dbe03ba..630761fd2e 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -40,6 +40,12 @@ public interface AMQPSessionCallback { void createTemporaryQueue(String queueName) throws Exception; + void createTemporaryQueue(String address, String queueName) throws Exception; + + void createDurableQueue(String address, String queueName) throws Exception; + + void deleteQueue(String address) throws Exception; + boolean queueQuery(String queueName) throws Exception; void closeSender(Object brokerConsumer) throws Exception; @@ -82,4 +88,5 @@ public interface AMQPSessionCallback { int messageFormat, ByteBuf messageEncoded) throws Exception; + String getPubSubPrefix(); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index 262dc2a198..34e18736e2 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -32,6 +32,7 @@ import org.apache.qpid.proton.engine.Transport; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPConnectionContext; import org.proton.plug.SASLResult; +import org.proton.plug.context.server.ProtonServerSenderContext; import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.handler.ProtonHandler; import org.proton.plug.handler.impl.DefaultEventHandler; @@ -163,6 +164,14 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl } } + public String getRemoteContainer() { + return handler.getConnection().getRemoteContainer(); + } + + public String getPubSubPrefix() { + return null; + } + // This listener will perform a bunch of things here class LocalListener extends DefaultEventHandler { @@ -265,7 +274,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl link.close(); ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); if (linkContext != null) { - linkContext.close(); + linkContext.close(true); } } @@ -274,6 +283,15 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl link.detach(); } + @Override + public void onDetach(Link link) throws Exception { + Object context = link.getContext(); + if (context instanceof ProtonServerSenderContext) { + ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context; + senderContext.close(false); + } + } + @Override public void onDelivery(Delivery delivery) throws Exception { ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext(); @@ -289,4 +307,6 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl } + + } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java index 6b209b83b2..29e3459e49 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java @@ -67,7 +67,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im * close the session * */ @Override - public void close() throws ActiveMQAMQPException { + public void close(boolean linkRemoteClose) throws ActiveMQAMQPException { closed = true; protonSession.removeSender(sender); synchronized (connection.getLock()) { @@ -84,7 +84,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im public void close(ErrorCondition condition) throws ActiveMQAMQPException { closed = true; sender.setCondition(condition); - close(); + close(false); } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java index 4286140b0f..ffc08d395e 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -53,14 +53,14 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable } @Override - public void close() throws ActiveMQAMQPException { + public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { protonSession.removeReceiver(receiver); } @Override public void close(ErrorCondition condition) throws ActiveMQAMQPException { receiver.setCondition(condition); - close(); + close(false); } public void flow(int credits) { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java index abb3115243..5b22944f02 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java @@ -85,7 +85,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i AbstractProtonContextSender protonConsumer = senders.remove(consumer); if (protonConsumer != null) { try { - protonConsumer.close(); + protonConsumer.close(false); } catch (ActiveMQAMQPException e) { protonConsumer.getSender().setTarget(null); @@ -116,7 +116,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i for (AbstractProtonReceiverContext protonProducer : receiversCopy) { try { - protonProducer.close(); + protonProducer.close(false); } catch (Exception e) { e.printStackTrace(); @@ -130,7 +130,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i for (AbstractProtonContextSender protonConsumer : protonSendersClone) { try { - protonConsumer.close(); + protonConsumer.close(false); } catch (Exception e) { e.printStackTrace(); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java index ad7ff4f307..d861394c1e 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java @@ -29,7 +29,11 @@ public interface ProtonDeliveryHandler { void onMessage(Delivery delivery) throws ActiveMQAMQPException; - void close() throws ActiveMQAMQPException; + /* + * we have to distinguish between a remote close on the link and a close via a connection or session as the latter mean + * that a link reattach can happen and we need to keep the underlying resource (queue/subscription) around for pub subs + * */ + void close(boolean remoteLinkClose) throws ActiveMQAMQPException; void close(ErrorCondition condition) throws ActiveMQAMQPException; } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java index 1b32b32e9a..e768bb47f4 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java @@ -116,7 +116,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } @Override - public void close() throws ActiveMQAMQPException { + public void close(boolean linkRemoteClose) throws ActiveMQAMQPException { //noop } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java index 76a7da9cc2..f4a43c1f8e 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java @@ -82,6 +82,11 @@ public class ProtonClientConnectionContext extends AbstractConnectionContext imp return sessionImpl; } + @Override + public void setContainer(String containerID) { + handler.getConnection().setContainer(containerID); + } + @Override protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException { AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java index 3b07a40e9b..b3e96bb6c2 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java @@ -64,12 +64,17 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext imp @Override public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException { + return createReceiver(address, address); + } + + @Override + public AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException { FutureRunnable futureRunnable = new FutureRunnable(1); ProtonClientReceiverContext amqpReceiver; synchronized (connection.getLock()) { - Receiver receiver = session.receiver(address); + Receiver receiver = session.receiver(name); Source source = new Source(); source.setAddress(address); receiver.setSource(source); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index ae1caa4933..13b50e5964 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -26,6 +26,8 @@ import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -50,6 +52,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector"); private static final Symbol COPY = Symbol.valueOf("copy"); + private static final Symbol TOPIC = Symbol.valueOf("topic"); private Object brokerConsumer; @@ -81,7 +84,10 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple //todo add flow control try { // to do whatever you need to make the broker start sending messages to the consumer - sessionSPI.startSender(brokerConsumer); + //this could be null if a link reattach has happened + if (brokerConsumer != null) { + sessionSPI.startSender(brokerConsumer); + } //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1); } catch (Exception e) { @@ -105,26 +111,58 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple /* * even tho the filter is a map it will only return a single filter unless a nolocal is also provided * */ - Map.Entry filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); - if (filter != null) { - selector = filter.getValue().getDescribed().toString(); - // Validate the Selector. - try { - SelectorParser.parse(selector); - } - catch (FilterException e) { - close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); - return; + if (source != null) { + Map.Entry filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); + if (filter != null) { + selector = filter.getValue().getDescribed().toString(); + // Validate the Selector. + try { + SelectorParser.parse(selector); + } + catch (FilterException e) { + close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); + return; + } } } + /* + * if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act + * like a subscription. + * */ + boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source); + //filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); //if (filter != null) { //todo implement nolocal filter //} + if (source == null) { + // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue + String clientId = connection.getRemoteContainer(); + String pubId = sender.getName(); + queue = clientId + ":" + pubId; + boolean exists = sessionSPI.queueQuery(queue); - if (source != null) { + /* + * If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a + * link remote close. + * */ + if (exists) { + source = new org.apache.qpid.proton.amqp.messaging.Source(); + source.setAddress(queue); + source.setDurable(TerminusDurability.UNSETTLED_STATE); + source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + source.setDistributionMode(COPY); + source.setCapabilities(TOPIC); + sender.setSource(source); + } + else { + sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + sender.getName())); + sender.close(); + } + } + else { if (source.getDynamic()) { //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // will be deleted on closing of the session @@ -141,7 +179,36 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple else { //if not dynamic then we use the targets address as the address to forward the messages to, however there has to //be a queue bound to it so we nee to check this. - queue = source.getAddress(); + + + if (isPubSub) { + // if we are a subscription and durable create a durable queue using the container id and link name + if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || + TerminusDurability.CONFIGURATION.equals(source.getDurable())) { + String clientId = connection.getRemoteContainer(); + String pubId = sender.getName(); + queue = clientId + ":" + pubId; + boolean exists = sessionSPI.queueQuery(queue); + if (!exists) { + sessionSPI.createDurableQueue(source.getAddress(), queue); + } + } + //otherwise we are a volatile subscription + else { + queue = java.util.UUID.randomUUID().toString(); + try { + sessionSPI.createTemporaryQueue(source.getAddress(), queue); + } + catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); + } + source.setAddress(queue); + } + + } + else { + queue = source.getAddress(); + } if (queue == null) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet(); } @@ -156,7 +223,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple } } - boolean browseOnly = source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); + boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { brokerConsumer = sessionSPI.createSender(this, queue, selector, browseOnly); } @@ -166,6 +233,12 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple } } + private boolean isPubSub(Source source) { + String pubSubPrefix = sessionSPI.getPubSubPrefix(); + return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix); + } + + /* * close the session * */ @@ -185,10 +258,23 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple * close the session * */ @Override - public void close() throws ActiveMQAMQPException { - super.close(); + public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { + super.close(remoteLinkClose); + try { sessionSPI.closeSender(brokerConsumer); + //if this is a link close rather than a connection close or detach, we need to delete any durable resources for + // say pub subs + if (remoteLinkClose ) { + Source source = (Source)sender.getSource(); + if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) { + String address = source.getAddress(); + boolean exists = sessionSPI.queueQuery(address); + if (exists) { + sessionSPI.deleteQueue(address); + } + } + } } catch (Exception e) { e.printStackTrace(); @@ -277,4 +363,17 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple return performSend(serverMessage, message); } + private static boolean hasCapabilities(Symbol symbol, Source source) { + if (source != null) { + if (source.getCapabilities() != null) { + for (Symbol cap : source.getCapabilities()) { + if (symbol.equals(cap)) { + return true; + } + } + } + } + return false; + } + } diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java index 3578926c50..1e5839c98d 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -70,6 +70,26 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } + @Override + public void createDurableQueue(String address, String queueName) throws Exception { + + } + + @Override + public void createTemporaryQueue(String address, String queueName) throws Exception { + + } + + @Override + public void deleteQueue(String address) throws Exception { + + } + + @Override + public String getPubSubPrefix() { + return null; + } + @Override public void onFlowConsumer(Object consumer, int credits, boolean drain) { } diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md index 5baa7cba3b..65bacef9cc 100644 --- a/docs/user-manual/en/protocols-interoperability.md +++ b/docs/user-manual/en/protocols-interoperability.md @@ -86,6 +86,21 @@ does not exist then an exception will be sent > For the next version we will add a flag to aut create durable queue > but for now you will have to add them via the configuration +### AMQP and Topics + +Although amqp has no notion of topics it is still possible to treat amqp consumers or receivers as subscriptions rather +than just consumers on a queue. By default any receiving link that attaches to an address with the prefix `jms.topic.` +will be treated as a subscription and a subscription queue will be created. If the Terminus Durability is either UNSETTLED_STATE +or CONFIGURATION then the queue will be made durable, similar to a JMS durable subscription and given a name made up from +the container id and the link name, something like `my-container-id:my-link-name`. if the Terminus Durability is configured +as NONE then a volatile queue will be created. + +The prefix can be changed by configuring the Acceptor and setting the `pubSubPrefix` like so + +> tcp://0.0.0.0:5672?protocols=AMQP;pubSubPrefix=foo.bar. + +Artemis also supports the qpid-jms client and will respect its use of topics regardless of the prefix used for the address. + ### AMQP and Coordinations - Handling Transactions An AMQP links target can also be a Coordinator, the Coordinator is used diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index e3106fcf9f..90f54254f8 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -140,6 +140,18 @@ artemis-openwire-protocol ${project.version} + + org.apache.activemq + artemis-proton-plug + ${project.version} + + + org.apache.activemq + artemis-proton-plug + ${project.version} + test + test-jar + org.apache.activemq artemis-hornetq-protocol diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java new file mode 100644 index 0000000000..bf4e38cbee --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.proton; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.proton.plug.AMQPClientConnectionContext; +import org.proton.plug.AMQPClientReceiverContext; +import org.proton.plug.AMQPClientSessionContext; +import org.proton.plug.test.Constants; +import org.proton.plug.test.minimalclient.SimpleAMQPConnector; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + + +public class ProtonPubSubTest extends ActiveMQTestBase { + private final String prefix = "foo.bar."; + private final String pubAddress = "pubAddress"; + private final String prefixedPubAddress = prefix + "pubAddress"; + private final SimpleString ssPubAddress = new SimpleString(pubAddress); + private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress); + private ActiveMQServer server; + private Connection connection; + private JmsConnectionFactory factory; + + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + disableCheckThread(); + server = this.createServer(true, true); + HashMap params = new HashMap<>(); + params.put(TransportConstants.PORT_PROP_NAME, "5672"); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP"); + HashMap extraParams = new HashMap<>(); + extraParams.put("pubSubPrefix", prefix); + TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "foo", extraParams); + + server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); + server.start(); + server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true); + server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true); + factory = new JmsConnectionFactory("amqp://localhost:5672"); + factory.setClientID("myClientID"); + connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + } + + @Override + @After + public void tearDown() throws Exception { + try { + Thread.sleep(250); + if (connection != null) { + connection.close(); + } + + server.stop(); + } + finally { + super.tearDown(); + } + } + + @Test + public void testNonDurablePubSub() throws Exception { + int numMessages = 100; + Topic topic = createTopic(pubAddress); + TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer sub = session.createSubscriber(topic); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + } + + @Test + public void testNonDurableMultiplePubSub() throws Exception { + int numMessages = 100; + Topic topic = createTopic(pubAddress); + TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer sub = session.createSubscriber(topic); + MessageConsumer sub2 = session.createSubscriber(topic); + MessageConsumer sub3 = session.createSubscriber(topic); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + receive = (TextMessage) sub2.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + receive = (TextMessage) sub3.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + } + + + @Test + public void testDurablePubSub() throws Exception { + int numMessages = 100; + Topic topic = createTopic(pubAddress); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + } + + @Test + public void testDurableMultiplePubSub() throws Exception { + int numMessages = 100; + Topic topic = createTopic(pubAddress); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); + TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2"); + TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3"); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + receive = (TextMessage) sub2.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + receive = (TextMessage) sub3.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + } + + @Test + public void testDurablePubSubReconnect() throws Exception { + int numMessages = 100; + Topic topic = createTopic(pubAddress); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + connection.close(); + connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + sub = session.createDurableSubscriber(topic, "myPubId"); + + sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + } + + @Test + public void testDurablePubSubUnsubscribe() throws Exception { + int numMessages = 100; + Topic topic = createTopic(pubAddress); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + sub.close(); + session.unsubscribe("myPubId"); + } + + + @Test + public void testPubSubWithSimpleClient() throws Exception { + SimpleAMQPConnector connector = new SimpleAMQPConnector(); + connector.start(); + AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT); + + clientConnection.setContainer("myContainerID"); + + clientConnection.clientOpen(null); + + AMQPClientSessionContext clientSession = clientConnection.createClientSession(); + AMQPClientReceiverContext receiver = clientSession.createReceiver(prefixedPubAddress); + int numMessages = 100; + Topic topic = createTopic(prefixedPubAddress); + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + + receiver.flow(100); + for (int i = 0; i < numMessages; i++) { + ProtonJMessage protonJMessage = receiver.receiveMessage(5000, TimeUnit.MILLISECONDS); + assertNotNull(protonJMessage); + assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i); + } + + } + + + @Test + public void testMultiplePubSubWithSimpleClient() throws Exception { + SimpleAMQPConnector connector = new SimpleAMQPConnector(); + connector.start(); + AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT); + + clientConnection.setContainer("myContainerID"); + + clientConnection.clientOpen(null); + + AMQPClientSessionContext clientSession = clientConnection.createClientSession(); + AMQPClientReceiverContext receiver = clientSession.createReceiver("sub1", prefixedPubAddress); + AMQPClientReceiverContext receiver2 = clientSession.createReceiver("sub2", prefixedPubAddress); + AMQPClientReceiverContext receiver3 = clientSession.createReceiver("sub3", prefixedPubAddress); + int numMessages = 100; + Topic topic = createTopic(prefixedPubAddress); + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + receiver.flow(100); + receiver2.flow(100); + receiver3.flow(100); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + + for (int i = 0; i < numMessages; i++) { + ProtonJMessage protonJMessage = receiver.receiveMessage(5000, TimeUnit.MILLISECONDS); + assertNotNull("did not get message " + i, protonJMessage); + assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i); + protonJMessage = receiver2.receiveMessage(5000, TimeUnit.MILLISECONDS); + assertNotNull("did not get message " + i, protonJMessage); + assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i); + protonJMessage = receiver3.receiveMessage(5000, TimeUnit.MILLISECONDS); + assertNotNull("did not get message " + i, protonJMessage); + assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i); + } + + } + + + private javax.jms.Topic createTopic(String address) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + return session.createTopic(address); + } + finally { + session.close(); + } + } +}