ARTEMIS-535 - Improve amqp protocol to support topics

https://issues.apache.org/jira/browse/ARTEMIS-535
This commit is contained in:
Andy Taylor 2016-05-24 11:59:04 +01:00
parent cd088888b6
commit 73f908b8b4
18 changed files with 605 additions and 27 deletions

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@ -54,6 +55,12 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
private final ProtonProtocolManagerFactory factory;
/*
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
* the address. This can be changed on the acceptor.
* */
private String pubSubPrefix = ActiveMQTopic.JMS_TOPIC_ADDRESS_PREFIX;
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@ -139,4 +146,13 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
public String getPubSubPrefix() {
return pubSubPrefix;
}
public void setPubSubPrefix(String pubSubPrefix) {
this.pubSubPrefix = pubSubPrefix;
}
}

View File

@ -184,6 +184,16 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
}
@Override
public void createTemporaryQueue(String address, String queueName) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
}
@Override
public void createDurableQueue(String address, String queueName) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
}
@Override
public boolean queueQuery(String queueName) throws Exception {
boolean queryResult = false;
@ -360,6 +370,16 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
}
@Override
public String getPubSubPrefix() {
return manager.getPubSubPrefix();
}
@Override
public void deleteQueue(String address) throws Exception {
manager.getServer().destroyQueue(new SimpleString(address));
}
private void resetContext() {
manager.getServer().getStorageManager().setContext(null);
}

View File

@ -31,4 +31,6 @@ public interface AMQPClientConnectionContext extends AMQPConnectionContext {
void clientOpen(ClientSASL sasl) throws Exception;
AMQPClientSessionContext createClientSession() throws ActiveMQAMQPException;
void setContainer(String containerID);
}

View File

@ -23,4 +23,6 @@ public interface AMQPClientSessionContext extends AMQPSessionContext {
AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException;
AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException;
AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException;
}

View File

@ -40,6 +40,12 @@ public interface AMQPSessionCallback {
void createTemporaryQueue(String queueName) throws Exception;
void createTemporaryQueue(String address, String queueName) throws Exception;
void createDurableQueue(String address, String queueName) throws Exception;
void deleteQueue(String address) throws Exception;
boolean queueQuery(String queueName) throws Exception;
void closeSender(Object brokerConsumer) throws Exception;
@ -82,4 +88,5 @@ public interface AMQPSessionCallback {
int messageFormat,
ByteBuf messageEncoded) throws Exception;
String getPubSubPrefix();
}

View File

@ -32,6 +32,7 @@ import org.apache.qpid.proton.engine.Transport;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.SASLResult;
import org.proton.plug.context.server.ProtonServerSenderContext;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.handler.ProtonHandler;
import org.proton.plug.handler.impl.DefaultEventHandler;
@ -163,6 +164,14 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
}
}
public String getRemoteContainer() {
return handler.getConnection().getRemoteContainer();
}
public String getPubSubPrefix() {
return null;
}
// This listener will perform a bunch of things here
class LocalListener extends DefaultEventHandler {
@ -265,7 +274,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
link.close();
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
linkContext.close();
linkContext.close(true);
}
}
@ -274,6 +283,15 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
link.detach();
}
@Override
public void onDetach(Link link) throws Exception {
Object context = link.getContext();
if (context instanceof ProtonServerSenderContext) {
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
senderContext.close(false);
}
}
@Override
public void onDelivery(Delivery delivery) throws Exception {
ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
@ -289,4 +307,6 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
}
}

View File

@ -67,7 +67,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
* close the session
* */
@Override
public void close() throws ActiveMQAMQPException {
public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
closed = true;
protonSession.removeSender(sender);
synchronized (connection.getLock()) {
@ -84,7 +84,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
closed = true;
sender.setCondition(condition);
close();
close(false);
}
@Override

View File

@ -53,14 +53,14 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
}
@Override
public void close() throws ActiveMQAMQPException {
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
protonSession.removeReceiver(receiver);
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
receiver.setCondition(condition);
close();
close(false);
}
public void flow(int credits) {

View File

@ -85,7 +85,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
AbstractProtonContextSender protonConsumer = senders.remove(consumer);
if (protonConsumer != null) {
try {
protonConsumer.close();
protonConsumer.close(false);
}
catch (ActiveMQAMQPException e) {
protonConsumer.getSender().setTarget(null);
@ -116,7 +116,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
for (AbstractProtonReceiverContext protonProducer : receiversCopy) {
try {
protonProducer.close();
protonProducer.close(false);
}
catch (Exception e) {
e.printStackTrace();
@ -130,7 +130,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
for (AbstractProtonContextSender protonConsumer : protonSendersClone) {
try {
protonConsumer.close();
protonConsumer.close(false);
}
catch (Exception e) {
e.printStackTrace();

View File

@ -29,7 +29,11 @@ public interface ProtonDeliveryHandler {
void onMessage(Delivery delivery) throws ActiveMQAMQPException;
void close() throws ActiveMQAMQPException;
/*
* we have to distinguish between a remote close on the link and a close via a connection or session as the latter mean
* that a link reattach can happen and we need to keep the underlying resource (queue/subscription) around for pub subs
* */
void close(boolean remoteLinkClose) throws ActiveMQAMQPException;
void close(ErrorCondition condition) throws ActiveMQAMQPException;
}

View File

@ -116,7 +116,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
}
@Override
public void close() throws ActiveMQAMQPException {
public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
//noop
}

View File

@ -82,6 +82,11 @@ public class ProtonClientConnectionContext extends AbstractConnectionContext imp
return sessionImpl;
}
@Override
public void setContainer(String containerID) {
handler.getConnection().setContainer(containerID);
}
@Override
protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);

View File

@ -64,12 +64,17 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext imp
@Override
public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException {
return createReceiver(address, address);
}
@Override
public AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException {
FutureRunnable futureRunnable = new FutureRunnable(1);
ProtonClientReceiverContext amqpReceiver;
synchronized (connection.getLock()) {
Receiver receiver = session.receiver(address);
Receiver receiver = session.receiver(name);
Source source = new Source();
source.setAddress(address);
receiver.setSource(source);

View File

@ -26,6 +26,8 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@ -50,6 +52,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
private static final Symbol COPY = Symbol.valueOf("copy");
private static final Symbol TOPIC = Symbol.valueOf("topic");
private Object brokerConsumer;
@ -81,7 +84,10 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
//todo add flow control
try {
// to do whatever you need to make the broker start sending messages to the consumer
sessionSPI.startSender(brokerConsumer);
//this could be null if a link reattach has happened
if (brokerConsumer != null) {
sessionSPI.startSender(brokerConsumer);
}
//protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
}
catch (Exception e) {
@ -105,26 +111,58 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
/*
* even tho the filter is a map it will only return a single filter unless a nolocal is also provided
* */
Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
if (filter != null) {
selector = filter.getValue().getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
}
catch (FilterException e) {
close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
return;
if (source != null) {
Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
if (filter != null) {
selector = filter.getValue().getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
}
catch (FilterException e) {
close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
return;
}
}
}
/*
* if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act
* like a subscription.
* */
boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
//filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
//if (filter != null) {
//todo implement nolocal filter
//}
if (source == null) {
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
queue = clientId + ":" + pubId;
boolean exists = sessionSPI.queueQuery(queue);
if (source != null) {
/*
* If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a
* link remote close.
* */
if (exists) {
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(queue);
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(COPY);
source.setCapabilities(TOPIC);
sender.setSource(source);
}
else {
sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + sender.getName()));
sender.close();
}
}
else {
if (source.getDynamic()) {
//if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
@ -141,7 +179,36 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to
//be a queue bound to it so we nee to check this.
queue = source.getAddress();
if (isPubSub) {
// if we are a subscription and durable create a durable queue using the container id and link name
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
queue = clientId + ":" + pubId;
boolean exists = sessionSPI.queueQuery(queue);
if (!exists) {
sessionSPI.createDurableQueue(source.getAddress(), queue);
}
}
//otherwise we are a volatile subscription
else {
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue);
}
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
source.setAddress(queue);
}
}
else {
queue = source.getAddress();
}
if (queue == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
}
@ -156,7 +223,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
}
}
boolean browseOnly = source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
brokerConsumer = sessionSPI.createSender(this, queue, selector, browseOnly);
}
@ -166,6 +233,12 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
}
}
private boolean isPubSub(Source source) {
String pubSubPrefix = sessionSPI.getPubSubPrefix();
return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
}
/*
* close the session
* */
@ -185,10 +258,23 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
* close the session
* */
@Override
public void close() throws ActiveMQAMQPException {
super.close();
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
try {
sessionSPI.closeSender(brokerConsumer);
//if this is a link close rather than a connection close or detach, we need to delete any durable resources for
// say pub subs
if (remoteLinkClose ) {
Source source = (Source)sender.getSource();
if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
String address = source.getAddress();
boolean exists = sessionSPI.queueQuery(address);
if (exists) {
sessionSPI.deleteQueue(address);
}
}
}
}
catch (Exception e) {
e.printStackTrace();
@ -277,4 +363,17 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
return performSend(serverMessage, message);
}
private static boolean hasCapabilities(Symbol symbol, Source source) {
if (source != null) {
if (source.getCapabilities() != null) {
for (Symbol cap : source.getCapabilities()) {
if (symbol.equals(cap)) {
return true;
}
}
}
}
return false;
}
}

View File

@ -70,6 +70,26 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
public void createDurableQueue(String address, String queueName) throws Exception {
}
@Override
public void createTemporaryQueue(String address, String queueName) throws Exception {
}
@Override
public void deleteQueue(String address) throws Exception {
}
@Override
public String getPubSubPrefix() {
return null;
}
@Override
public void onFlowConsumer(Object consumer, int credits, boolean drain) {
}

View File

@ -86,6 +86,21 @@ does not exist then an exception will be sent
> For the next version we will add a flag to aut create durable queue
> but for now you will have to add them via the configuration
### AMQP and Topics
Although amqp has no notion of topics it is still possible to treat amqp consumers or receivers as subscriptions rather
than just consumers on a queue. By default any receiving link that attaches to an address with the prefix `jms.topic.`
will be treated as a subscription and a subscription queue will be created. If the Terminus Durability is either UNSETTLED_STATE
or CONFIGURATION then the queue will be made durable, similar to a JMS durable subscription and given a name made up from
the container id and the link name, something like `my-container-id:my-link-name`. if the Terminus Durability is configured
as NONE then a volatile queue will be created.
The prefix can be changed by configuring the Acceptor and setting the `pubSubPrefix` like so
> <acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP;pubSubPrefix=foo.bar.</acceptor>
Artemis also supports the qpid-jms client and will respect its use of topics regardless of the prefix used for the address.
### AMQP and Coordinations - Handling Transactions
An AMQP links target can also be a Coordinator, the Coordinator is used

View File

@ -140,6 +140,18 @@
<artifactId>artemis-openwire-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-proton-plug</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-proton-plug</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-hornetq-protocol</artifactId>

View File

@ -0,0 +1,351 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.proton;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPClientSessionContext;
import org.proton.plug.test.Constants;
import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
public class ProtonPubSubTest extends ActiveMQTestBase {
private final String prefix = "foo.bar.";
private final String pubAddress = "pubAddress";
private final String prefixedPubAddress = prefix + "pubAddress";
private final SimpleString ssPubAddress = new SimpleString(pubAddress);
private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
private ActiveMQServer server;
private Connection connection;
private JmsConnectionFactory factory;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
disableCheckThread();
server = this.createServer(true, true);
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, "5672");
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
HashMap<String, Object> extraParams = new HashMap<>();
extraParams.put("pubSubPrefix", prefix);
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "foo", extraParams);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
server.start();
server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
factory = new JmsConnectionFactory("amqp://localhost:5672");
factory.setClientID("myClientID");
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
}
@Override
@After
public void tearDown() throws Exception {
try {
Thread.sleep(250);
if (connection != null) {
connection.close();
}
server.stop();
}
finally {
super.tearDown();
}
}
@Test
public void testNonDurablePubSub() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer sub = session.createSubscriber(topic);
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(topic);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
for (int i = 0; i < numMessages; i++) {
TextMessage receive = (TextMessage) sub.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
}
}
@Test
public void testNonDurableMultiplePubSub() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer sub = session.createSubscriber(topic);
MessageConsumer sub2 = session.createSubscriber(topic);
MessageConsumer sub3 = session.createSubscriber(topic);
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(topic);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
for (int i = 0; i < numMessages; i++) {
TextMessage receive = (TextMessage) sub.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
receive = (TextMessage) sub2.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
receive = (TextMessage) sub3.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
}
}
@Test
public void testDurablePubSub() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(topic);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
for (int i = 0; i < numMessages; i++) {
TextMessage receive = (TextMessage) sub.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
}
}
@Test
public void testDurableMultiplePubSub() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2");
TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3");
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(topic);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
for (int i = 0; i < numMessages; i++) {
TextMessage receive = (TextMessage) sub.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
receive = (TextMessage) sub2.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
receive = (TextMessage) sub3.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
}
}
@Test
public void testDurablePubSubReconnect() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(topic);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
for (int i = 0; i < numMessages; i++) {
TextMessage receive = (TextMessage) sub.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
}
connection.close();
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
sub = session.createDurableSubscriber(topic, "myPubId");
sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = sendSession.createProducer(topic);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
for (int i = 0; i < numMessages; i++) {
TextMessage receive = (TextMessage) sub.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
}
}
@Test
public void testDurablePubSubUnsubscribe() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(topic);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
for (int i = 0; i < numMessages; i++) {
TextMessage receive = (TextMessage) sub.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals(receive.getText(), "message:" + i);
}
sub.close();
session.unsubscribe("myPubId");
}
@Test
public void testPubSubWithSimpleClient() throws Exception {
SimpleAMQPConnector connector = new SimpleAMQPConnector();
connector.start();
AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
clientConnection.setContainer("myContainerID");
clientConnection.clientOpen(null);
AMQPClientSessionContext clientSession = clientConnection.createClientSession();
AMQPClientReceiverContext receiver = clientSession.createReceiver(prefixedPubAddress);
int numMessages = 100;
Topic topic = createTopic(prefixedPubAddress);
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(topic);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
receiver.flow(100);
for (int i = 0; i < numMessages; i++) {
ProtonJMessage protonJMessage = receiver.receiveMessage(5000, TimeUnit.MILLISECONDS);
assertNotNull(protonJMessage);
assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
}
}
@Test
public void testMultiplePubSubWithSimpleClient() throws Exception {
SimpleAMQPConnector connector = new SimpleAMQPConnector();
connector.start();
AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
clientConnection.setContainer("myContainerID");
clientConnection.clientOpen(null);
AMQPClientSessionContext clientSession = clientConnection.createClientSession();
AMQPClientReceiverContext receiver = clientSession.createReceiver("sub1", prefixedPubAddress);
AMQPClientReceiverContext receiver2 = clientSession.createReceiver("sub2", prefixedPubAddress);
AMQPClientReceiverContext receiver3 = clientSession.createReceiver("sub3", prefixedPubAddress);
int numMessages = 100;
Topic topic = createTopic(prefixedPubAddress);
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(topic);
receiver.flow(100);
receiver2.flow(100);
receiver3.flow(100);
connection.start();
for (int i = 0; i < numMessages; i++) {
producer.send(sendSession.createTextMessage("message:" + i));
}
for (int i = 0; i < numMessages; i++) {
ProtonJMessage protonJMessage = receiver.receiveMessage(5000, TimeUnit.MILLISECONDS);
assertNotNull("did not get message " + i, protonJMessage);
assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
protonJMessage = receiver2.receiveMessage(5000, TimeUnit.MILLISECONDS);
assertNotNull("did not get message " + i, protonJMessage);
assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
protonJMessage = receiver3.receiveMessage(5000, TimeUnit.MILLISECONDS);
assertNotNull("did not get message " + i, protonJMessage);
assertEquals(((AmqpValue) protonJMessage.getBody()).getValue(), "message:" + i);
}
}
private javax.jms.Topic createTopic(String address) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
return session.createTopic(address);
}
finally {
session.close();
}
}
}