From 4228e3d3e812436585ea7e7746f8eb42a7a45493 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 18 Mar 2015 14:59:49 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5666 Tests around durable subscription lookup and reattach. --- .../transport/amqp/AmqpProtocolConverter.java | 1 + .../transport/amqp/client/AmqpReceiver.java | 7 +- .../transport/amqp/client/AmqpSession.java | 37 ++++ .../amqp/interop/AmqpDurableReceiverTest.java | 160 +++++++++++++----- 4 files changed, 161 insertions(+), 44 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 5a73a25a76..d1e9f5ae7f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -1460,6 +1460,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { source.setAddress(destination.getQualifiedName()); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + source.setDistributionMode(COPY); } else { consumerContext.closed = true; sender.setSource(null); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 1290d270cd..585ba93c7e 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -89,6 +89,11 @@ public class AmqpReceiver extends AmqpAbstractResource { * The unique ID assigned to this receiver. */ public AmqpReceiver(AmqpSession session, String address, String receiverId) { + + if (address != null && address.isEmpty()) { + throw new IllegalArgumentException("Address cannot be empty."); + } + this.session = session; this.address = address; this.receiverId = receiverId; @@ -449,7 +454,7 @@ public class AmqpReceiver extends AmqpAbstractResource { Source source = userSpecifiedSource; Target target = new Target(); - if (userSpecifiedSource == null) { + if (userSpecifiedSource == null && address != null) { source = new Source(); source.setAddress(address); configureSource(source); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 8b039b682f..3b2a3d1dab 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -267,6 +267,43 @@ public class AmqpSession extends AmqpAbstractResource { return receiver; } + /** + * Create a receiver instance using the given address that creates a durable subscription. + * + * @param subscriptionName + * the name of the subscription that should be queried for on the remote.. + * + * @return a newly created receiver that is ready for use if the subscription exists. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver lookupSubscription(String subscriptionName) throws Exception { + checkClosed(); + + if (subscriptionName == null || subscriptionName.isEmpty()) { + throw new IllegalArgumentException("subscription name must not be null or empty."); + } + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, (String) null, getNextReceiverId()); + receiver.setSubscriptionName(subscriptionName); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return receiver; + } + /** * @return this session's parent AmqpConnection. */ diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java index 7fd608079a..028991d9db 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java @@ -16,9 +16,10 @@ */ package org.apache.activemq.transport.amqp.interop; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.TimeUnit; +import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -26,7 +27,10 @@ import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.engine.Receiver; import org.junit.Test; /** @@ -52,14 +56,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { final BrokerViewMBean brokerView = getProxyToBroker(); - assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return brokerView.getDurableTopicSubscribers().length == 1; - } - - }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10))); + assertEquals(1, brokerView.getDurableTopicSubscribers().length); connection.close(); } @@ -77,25 +74,13 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { final BrokerViewMBean brokerView = getProxyToBroker(); - assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return brokerView.getDurableTopicSubscribers().length == 1; - } - - }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10))); + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); receiver.detach(); - assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return brokerView.getInactiveDurableTopicSubscribers().length == 1; - } - - }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10))); + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length); connection.close(); } @@ -113,26 +98,115 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { final BrokerViewMBean brokerView = getProxyToBroker(); - assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return brokerView.getDurableTopicSubscribers().length == 1; - } - - }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10))); + assertEquals(1, brokerView.getDurableTopicSubscribers().length); receiver.close(); - assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() { + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); - @Override - public boolean isSatisified() throws Exception { - return brokerView.getDurableTopicSubscribers().length == 0 && - brokerView.getInactiveDurableTopicSubscribers().length == 0; - } + connection.close(); + } - }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10))); + @Test(timeout = 60000) + public void testReattachToDurableNode() throws Exception { + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName()); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.detach(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName()); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.close(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupExistingSubscription() throws Exception { + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName()); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.detach(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver = session.lookupSubscription(getTestName()); + + assertNotNull(receiver); + + Receiver protonReceiver = receiver.getReceiver(); + assertNotNull(protonReceiver.getRemoteSource()); + Source remoteSource = (Source) protonReceiver.getRemoteSource(); + + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); + assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); + assertEquals(COPY, remoteSource.getDistributionMode()); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.close(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupNonExistingSubscription() throws Exception { + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + try { + session.lookupSubscription(getTestName()); + fail("Should throw an exception since there is not subscription"); + } catch (Exception e) { + LOG.info("Error on lookup: {}", e.getMessage()); + } connection.close(); }