mirror of https://github.com/apache/activemq.git
NO-JIRA: Add some additional test variations and add some more checks
This commit is contained in:
parent
f71e0ee15b
commit
3e237ca73a
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -31,6 +31,7 @@ import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
|||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||
|
@ -42,6 +43,8 @@ import org.junit.Test;
|
|||
*/
|
||||
public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
||||
|
||||
private final String SELECTOR_STRING = "color = red";
|
||||
|
||||
@Override
|
||||
protected boolean isUseOpenWireConnector() {
|
||||
return true;
|
||||
|
@ -244,7 +247,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception {
|
||||
public void testLookupExistingSubscriptionWithSelector() throws Exception {
|
||||
|
||||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
|
@ -254,7 +257,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
connection.connect();
|
||||
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), "color = red", true);
|
||||
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING);
|
||||
|
||||
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
@ -272,10 +275,107 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
assertNotNull(protonReceiver.getRemoteSource());
|
||||
Source remoteSource = (Source) protonReceiver.getRemoteSource();
|
||||
|
||||
if (remoteSource.getFilter() != null) {
|
||||
assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
|
||||
assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
|
||||
}
|
||||
assertNotNull(remoteSource.getFilter());
|
||||
assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
|
||||
assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
|
||||
assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed());
|
||||
|
||||
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
|
||||
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
|
||||
assertEquals(COPY, remoteSource.getDistributionMode());
|
||||
|
||||
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
receiver.close();
|
||||
|
||||
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testLookupExistingSubscriptionWithNoLocal() throws Exception {
|
||||
|
||||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), null, true);
|
||||
|
||||
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
receiver.detach();
|
||||
|
||||
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
receiver = session.lookupSubscription(getTestName());
|
||||
|
||||
assertNotNull(receiver);
|
||||
|
||||
Receiver protonReceiver = receiver.getReceiver();
|
||||
assertNotNull(protonReceiver.getRemoteSource());
|
||||
Source remoteSource = (Source) protonReceiver.getRemoteSource();
|
||||
|
||||
assertNotNull(remoteSource.getFilter());
|
||||
assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
|
||||
assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
|
||||
|
||||
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
|
||||
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
|
||||
assertEquals(COPY, remoteSource.getDistributionMode());
|
||||
|
||||
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
receiver.close();
|
||||
|
||||
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception {
|
||||
|
||||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING, true);
|
||||
|
||||
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
receiver.detach();
|
||||
|
||||
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
||||
receiver = session.lookupSubscription(getTestName());
|
||||
|
||||
assertNotNull(receiver);
|
||||
|
||||
Receiver protonReceiver = receiver.getReceiver();
|
||||
assertNotNull(protonReceiver.getRemoteSource());
|
||||
Source remoteSource = (Source) protonReceiver.getRemoteSource();
|
||||
|
||||
assertNotNull(remoteSource.getFilter());
|
||||
assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
|
||||
assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
|
||||
assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed());
|
||||
|
||||
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
|
||||
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
|
||||
|
@ -303,7 +403,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
connection.connect();
|
||||
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), "color = red", true);
|
||||
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING, true);
|
||||
|
||||
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||
|
@ -328,10 +428,10 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
assertNotNull(protonReceiver.getRemoteSource());
|
||||
Source remoteSource = (Source) protonReceiver.getRemoteSource();
|
||||
|
||||
if (remoteSource.getFilter() != null) {
|
||||
assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
|
||||
assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
|
||||
}
|
||||
assertNotNull(remoteSource.getFilter());
|
||||
assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
|
||||
assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
|
||||
assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed());
|
||||
|
||||
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
|
||||
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
|
||||
|
|
Loading…
Reference in New Issue