Tests around durable subscription lookup and reattach.
This commit is contained in:
Timothy Bish 2015-03-18 14:59:49 -04:00
parent 20832f1f1b
commit 4228e3d3e8
4 changed files with 161 additions and 44 deletions

View File

@ -1460,6 +1460,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
source.setAddress(destination.getQualifiedName());
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(COPY);
} else {
consumerContext.closed = true;
sender.setSource(null);

View File

@ -89,6 +89,11 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* The unique ID assigned to this receiver.
*/
public AmqpReceiver(AmqpSession session, String address, String receiverId) {
if (address != null && address.isEmpty()) {
throw new IllegalArgumentException("Address cannot be empty.");
}
this.session = session;
this.address = address;
this.receiverId = receiverId;
@ -449,7 +454,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
Source source = userSpecifiedSource;
Target target = new Target();
if (userSpecifiedSource == null) {
if (userSpecifiedSource == null && address != null) {
source = new Source();
source.setAddress(address);
configureSource(source);

View File

@ -267,6 +267,43 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return receiver;
}
/**
* Create a receiver instance using the given address that creates a durable subscription.
*
* @param subscriptionName
* the name of the subscription that should be queried for on the remote..
*
* @return a newly created receiver that is ready for use if the subscription exists.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver lookupSubscription(String subscriptionName) throws Exception {
checkClosed();
if (subscriptionName == null || subscriptionName.isEmpty()) {
throw new IllegalArgumentException("subscription name must not be null or empty.");
}
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, (String) null, getNextReceiverId());
receiver.setSubscriptionName(subscriptionName);
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport();
}
});
request.sync();
return receiver;
}
/**
* @return this session's parent AmqpConnection.
*/

View File

@ -16,9 +16,10 @@
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -26,7 +27,10 @@ import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
/**
@ -52,14 +56,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
connection.close();
}
@ -77,25 +74,13 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
receiver.detach();
assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getInactiveDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
connection.close();
}
@ -113,26 +98,115 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10)));
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
receiver.close();
assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() {
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
@Override
public boolean isSatisified() throws Exception {
return brokerView.getDurableTopicSubscribers().length == 0 &&
brokerView.getInactiveDurableTopicSubscribers().length == 0;
}
connection.close();
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10)));
@Test(timeout = 60000)
public void testReattachToDurableNode() throws Exception {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setContainerId(getTestName());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
receiver.detach();
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
receiver.close();
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
connection.close();
}
@Test(timeout = 60000)
public void testLookupExistingSubscription() throws Exception {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setContainerId(getTestName());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
receiver.detach();
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
receiver = session.lookupSubscription(getTestName());
assertNotNull(receiver);
Receiver protonReceiver = receiver.getReceiver();
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
assertEquals(COPY, remoteSource.getDistributionMode());
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
receiver.close();
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
connection.close();
}
@Test(timeout = 60000)
public void testLookupNonExistingSubscription() throws Exception {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setContainerId(getTestName());
connection.connect();
AmqpSession session = connection.createSession();
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
try {
session.lookupSubscription(getTestName());
fail("Should throw an exception since there is not subscription");
} catch (Exception e) {
LOG.info("Error on lookup: {}", e.getMessage());
}
connection.close();
}