ARTEMIS-1445 fix auto-delete for AMQP and OpenWire

This commit is contained in:
Justin Bertram 2017-10-03 20:40:27 -05:00
parent 383291d4d7
commit 03d56d2cf5
4 changed files with 134 additions and 18 deletions

View File

@ -264,7 +264,7 @@ public class AMQPSessionCallback implements SessionCallback {
if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
try { try {
serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true); serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing. // The queue may have been created by another thread in the mean time. Catch and do nothing.
} }
@ -289,7 +289,7 @@ public class AMQPSessionCallback implements SessionCallback {
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
} else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) { } else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
try { try {
serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true); serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing. // The queue may have been created by another thread in the mean time. Catch and do nothing.
} }

View File

@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@ -57,7 +56,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -70,6 +68,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.TempQueueObserver; import org.apache.activemq.artemis.core.server.TempQueueObserver;
import org.apache.activemq.artemis.core.server.impl.RefsOperation; import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
@ -725,20 +724,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
public void addDestination(DestinationInfo info) throws Exception { public void addDestination(DestinationInfo info) throws Exception {
boolean created = false;
ActiveMQDestination dest = info.getDestination(); ActiveMQDestination dest = info.getDestination();
if (dest.isQueue()) {
SimpleString qName = new SimpleString(dest.getPhysicalName()); SimpleString qName = SimpleString.toSimpleString(dest.getPhysicalName());
QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); if (server.locateQueue(qName) == null) {
if (binding == null) { AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(dest.getPhysicalName());
if (dest.isTemporary()) { if (dest.isQueue() && (addressSettings.isAutoCreateQueues() || dest.isTemporary())) {
internalSession.createQueue(qName, qName, RoutingType.ANYCAST, null, dest.isTemporary(), false); internalSession.createQueue(qName, qName, RoutingType.ANYCAST, null, dest.isTemporary(), !dest.isTemporary(), !dest.isTemporary());
} else { created = true;
ConnectionInfo connInfo = getState().getInfo(); } else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) {
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; internalSession.createAddress(qName, RoutingType.MULTICAST, !dest.isTemporary());
server.getSecurityStore().check(qName, checkType, this); created = true;
server.checkQueueCreationLimit(getUsername());
server.createQueue(qName, RoutingType.ANYCAST, qName, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), null,true, false);
}
} }
} }
@ -748,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.state.addTempDestination(info); this.state.addTempDestination(info);
} }
if (!AdvisorySupport.isAdvisoryTopic(dest)) { if (created && !AdvisorySupport.isAdvisoryTopic(dest)) {
AMQConnectionContext context = getContext(); AMQConnectionContext context = getContext();
DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest); DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);

View File

@ -260,6 +260,40 @@ public class ConsumerTest extends ActiveMQTestBase {
assertEquals(0, server.getTotalMessageCount()); assertEquals(0, server.getTotalMessageCount());
} }
@Test
public void testAutoDeleteAutoCreatedAddressAndQueue() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
ConnectionFactory factorySend = createFactory(2);
Connection connection = factorySend.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue("queue");
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(1000));
} finally {
connection.close();
}
assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
assertNull(server.locateQueue(SimpleString.toSimpleString("queue")));
assertEquals(0, server.getTotalMessageCount());
}
@Test @Test
public void testSendCoreReceiveAMQP() throws Throwable { public void testSendCoreReceiveAMQP() throws Throwable {

View File

@ -24,9 +24,11 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.Topic;
import java.util.Map; import java.util.Map;
public class ProducerAutoCreateQueueTest extends BasicOpenWireTest { public class ProducerAutoCreateQueueTest extends BasicOpenWireTest {
@ -53,7 +55,90 @@ public class ProducerAutoCreateQueueTest extends BasicOpenWireTest {
connection.close(); connection.close();
} }
} }
}
@Test
public void testAutoCreateSendToTopic() throws Exception {
Connection connection = null;
try {
connection = factory.createConnection("admin", "password");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic trash = session.createTopic("trash");
final MessageProducer producer = session.createProducer(trash);
producer.send(session.createTextMessage("foo"));
} finally {
if (connection != null) {
connection.close();
}
}
assertNotNull(server.getAddressInfo(new SimpleString("trash")));
assertEquals(0, server.getTotalMessageCount());
}
@Test
public void testAutoCreateSendToQueue() throws Exception {
Connection connection = null;
try {
connection = factory.createConnection("admin", "password");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue trash = session.createQueue("trash");
final MessageProducer producer = session.createProducer(trash);
producer.send(session.createTextMessage("foo"));
} finally {
if (connection != null) {
connection.close();
}
}
assertNotNull(server.getAddressInfo(new SimpleString("trash")));
assertNotNull(server.locateQueue(new SimpleString("trash")));
assertEquals(1, server.getTotalMessageCount());
}
@Test
public void testAutoDelete() throws Exception {
Connection connection = null;
try {
connection = factory.createConnection("admin", "password");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue trash = session.createQueue("trash");
final MessageProducer producer = session.createProducer(trash);
producer.send(session.createTextMessage("foo"));
Assert.assertNotNull(server.locateQueue(new SimpleString("trash")));
MessageConsumer consumer = session.createConsumer(trash);
connection.start();
assertNotNull(consumer.receive(1000));
} finally {
if (connection != null) {
connection.close();
}
}
assertNull(server.locateQueue(new SimpleString("trash")));
}
@Test
public void testAutoDeleteNegative() throws Exception {
server.getAddressSettingsRepository().addMatch("trash", new AddressSettings().setAutoDeleteQueues(false).setAutoDeleteAddresses(false));
Connection connection = null;
try {
connection = factory.createConnection("admin", "password");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue trash = session.createQueue("trash");
final MessageProducer producer = session.createProducer(trash);
producer.send(session.createTextMessage("foo"));
Assert.assertNotNull(server.locateQueue(new SimpleString("trash")));
MessageConsumer consumer = session.createConsumer(trash);
connection.start();
assertNotNull(consumer.receive(1000));
} finally {
if (connection != null) {
connection.close();
}
}
assertNotNull(server.locateQueue(new SimpleString("trash")));
assertNotNull(server.getAddressInfo(new SimpleString("trash")));
} }
} }