mirror of https://github.com/apache/activemq.git
Add some additional tests for durable subscription recovery and lookup.
This commit is contained in:
parent
c5a1b86062
commit
29fb4a4b3f
|
@ -43,6 +43,11 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isPersistent() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testCreateDurableReceiver() throws Exception {
|
public void testCreateDurableReceiver() throws Exception {
|
||||||
|
|
||||||
|
@ -142,6 +147,49 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReattachToDurableNodeAfterRestart() throws Exception {
|
||||||
|
|
||||||
|
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = client.createConnection();
|
||||||
|
connection.setContainerId(getTestName());
|
||||||
|
connection.connect();
|
||||||
|
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
|
||||||
|
|
||||||
|
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||||
|
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||||
|
|
||||||
|
receiver.detach();
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
restartBroker();
|
||||||
|
|
||||||
|
connection = client.createConnection();
|
||||||
|
connection.setContainerId(getTestName());
|
||||||
|
connection.connect();
|
||||||
|
session = connection.createSession();
|
||||||
|
|
||||||
|
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||||
|
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||||
|
|
||||||
|
receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
|
||||||
|
|
||||||
|
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||||
|
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||||
|
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||||
|
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testLookupExistingSubscription() throws Exception {
|
public void testLookupExistingSubscription() throws Exception {
|
||||||
|
|
||||||
|
@ -186,6 +234,57 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testLookupExistingSubscriptionAfterRestart() throws Exception {
|
||||||
|
|
||||||
|
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = client.createConnection();
|
||||||
|
connection.setContainerId(getTestName());
|
||||||
|
connection.connect();
|
||||||
|
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
|
||||||
|
|
||||||
|
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||||
|
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||||
|
|
||||||
|
receiver.detach();
|
||||||
|
|
||||||
|
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||||
|
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||||
|
|
||||||
|
restartBroker();
|
||||||
|
|
||||||
|
connection = client.createConnection();
|
||||||
|
connection.setContainerId(getTestName());
|
||||||
|
connection.connect();
|
||||||
|
|
||||||
|
session = connection.createSession();
|
||||||
|
receiver = session.lookupSubscription(getTestName());
|
||||||
|
|
||||||
|
assertNotNull(receiver);
|
||||||
|
|
||||||
|
Receiver protonReceiver = receiver.getReceiver();
|
||||||
|
assertNotNull(protonReceiver.getRemoteSource());
|
||||||
|
Source remoteSource = (Source) protonReceiver.getRemoteSource();
|
||||||
|
|
||||||
|
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
|
||||||
|
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
|
||||||
|
assertEquals(COPY, remoteSource.getDistributionMode());
|
||||||
|
|
||||||
|
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
|
||||||
|
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||||
|
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
|
||||||
|
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testLookupNonExistingSubscription() throws Exception {
|
public void testLookupNonExistingSubscription() throws Exception {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue