ARTEMIS-723 - AMQP subscriptions aren't deleted properly

https://issues.apache.org/jira/browse/ARTEMIS-723
This commit is contained in:
Andy Taylor 2016-09-09 10:36:01 +01:00
parent abed0cd5b9
commit cdb0391c1c
3 changed files with 123 additions and 2 deletions

View File

@ -195,7 +195,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
@Override
public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
}
@Override

View File

@ -272,7 +272,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
try {
sessionSPI.closeSender(brokerConsumer);
//if this is a link close rather than a connection close or detach, we need to delete any durable resources for
@ -285,6 +284,15 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
if (exists) {
sessionSPI.deleteQueue(address);
}
else {
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
String queue = clientId + ":" + pubId;
exists = sessionSPI.queueQuery(queue);
if (exists) {
sessionSPI.deleteQueue(queue);
}
}
}
}
}

View File

@ -39,6 +39,9 @@ import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
@ -56,6 +59,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -183,6 +188,62 @@ public class ProtonTest extends ProtonTestBase {
}
}
@Test
public void testDurableSubscriptionUnsubscribe() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
Connection connection = createConnection("myClientId");
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("amqp_testtopic");
TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub");
session.close();
connection.close();
connection = createConnection("myClientId");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
myDurSub = session.createDurableSubscriber(topic, "myDurSub");
myDurSub.close();
Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
session.unsubscribe("myDurSub");
Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
session.close();
connection.close();
}
finally {
if (connection != null) {
connection.close();
}
}
}
@Test
public void testTemporarySubscriptionDeleted() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
try {
TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("amqp_testtopic");
TopicSubscriber myDurSub = session.createSubscriber(topic);
Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
Assert.assertEquals(2, bindingsForAddress.getBindings().size());
session.close();
final CountDownLatch latch = new CountDownLatch(1);
server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() {
@Override
public void connectionClosed() {
latch.countDown();
}
});
connection.close();
latch.await(5, TimeUnit.SECONDS);
bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
Assert.assertEquals(1, bindingsForAddress.getBindings().size());
}
finally {
if (connection != null) {
connection.close();
}
}
}
@Test
public void testBrokerContainerId() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
@ -1530,6 +1591,58 @@ public class ProtonTest extends ProtonTestBase {
return connection;
}
private javax.jms.Connection createConnection(String clientId) throws JMSException {
Connection connection;
if (protocol == 3) {
factory = new JmsConnectionFactory(amqpConnectionUri);
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.setClientID(clientId);
connection.start();
}
else if (protocol == 0) {
factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.setClientID(clientId);
connection.start();
}
else {
TransportConfiguration transport;
if (protocol == 1) {
transport = new TransportConfiguration(INVM_CONNECTOR_FACTORY);
factory = new ActiveMQConnectionFactory("vm:/0");
}
else {
factory = new ActiveMQConnectionFactory();
}
connection = factory.createConnection(userName, password);
connection.setClientID(clientId);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
}
return connection;
}
private void setAddressFullBlockPolicy() {
// For BLOCK tests
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");