ARTEMIS-4125 address can be removed inadvertently
When the last non-durable subscriber on a JMS topic disconnects the corresponding queue representing the subscription is deleted as expected. However, the queue's address will also be deleted no matter what, which is *not* expected.
This commit is contained in:
parent
58978ad94a
commit
114302a093
|
@ -1126,16 +1126,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
|
|
||||||
private void run() {
|
private void run() {
|
||||||
try {
|
try {
|
||||||
|
Binding binding = server.getPostOffice().getBinding(bindingName);
|
||||||
|
if (binding == null) {
|
||||||
|
// the queue may have already been deleted
|
||||||
|
return;
|
||||||
|
}
|
||||||
logger.debug("deleting temporary queue {}", bindingName);
|
logger.debug("deleting temporary queue {}", bindingName);
|
||||||
|
server.destroyQueue(bindingName, null, false, false, server.getAddressInfo(binding.getAddress()).isTemporary());
|
||||||
try {
|
if (observer != null) {
|
||||||
server.destroyQueue(bindingName, null, false, false, true);
|
observer.tempQueueDeleted(bindingName);
|
||||||
if (observer != null) {
|
|
||||||
observer.tempQueueDeleted(bindingName);
|
|
||||||
}
|
|
||||||
} catch (ActiveMQException e) {
|
|
||||||
// that's fine.. it can happen due to queue already been deleted
|
|
||||||
logger.debug(e.getMessage(), e);
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(bindingName, e);
|
ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(bindingName, e);
|
||||||
|
|
|
@ -22,6 +22,7 @@ import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
|
@ -185,6 +186,39 @@ public class AutoDeleteJmsDestinationTest extends JMSTestBase {
|
||||||
assertNull(server.getManagementService().getResource("jtest"));
|
assertNull(server.getManagementService().getResource("jtest"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoDeleteTopicNegative() throws Exception {
|
||||||
|
final int numMessages = 100;
|
||||||
|
final SimpleString addressName = new SimpleString("test");
|
||||||
|
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setAutoDeleteAddresses(false));
|
||||||
|
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic topic = session.createTopic(addressName.toString());
|
||||||
|
MessageConsumer messageConsumer = session.createConsumer(topic);
|
||||||
|
MessageProducer producer = session.createProducer(topic);
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
TextMessage mess = session.createTextMessage("msg" + i);
|
||||||
|
producer.send(mess);
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
// ensure the address was created
|
||||||
|
assertNotNull(server.getAddressInfo(addressName));
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
Message m = messageConsumer.receive(5000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
// ensure the topic was not removed
|
||||||
|
assertFalse(Wait.waitFor(() -> server.getAddressInfo(addressName) == null, 2000, 100));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoDeleteTopicDurableSubscriber() throws Exception {
|
public void testAutoDeleteTopicDurableSubscriber() throws Exception {
|
||||||
Connection connection = cf.createConnection();
|
Connection connection = cf.createConnection();
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImp
|
||||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
@ -198,6 +199,21 @@ public class TemporaryQueueTest extends SingleServerTestBase {
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreserveNonTemporaryAddressAfterConnectionIsClosed() throws Exception {
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setAutoDeleteAddresses(false));
|
||||||
|
|
||||||
|
server.addAddressInfo(new AddressInfo(address).setTemporary(false).setAutoCreated(true));
|
||||||
|
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false).setTemporary(true));
|
||||||
|
assertNotNull(server.getAddressInfo(address));
|
||||||
|
session.close();
|
||||||
|
sf.close();
|
||||||
|
Wait.assertTrue(() -> server.locateQueue(queue) == null, 2000, 100);
|
||||||
|
assertFalse(Wait.waitFor(() -> server.getAddressInfo(address) == null, 2000, 100));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueWithWildcard() throws Exception {
|
public void testQueueWithWildcard() throws Exception {
|
||||||
session.createQueue(new QueueConfiguration("queue1").setAddress("a.b"));
|
session.createQueue(new QueueConfiguration("queue1").setAddress("a.b"));
|
||||||
|
|
Loading…
Reference in New Issue