diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index aab414ff5f..19f6351f5a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -16,9 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; @@ -45,7 +43,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; @@ -317,35 +314,22 @@ public class AMQPSessionCallback implements SessionCallback { public void closeSender(final Object brokerConsumer) throws Exception { final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); - final CountDownLatch latch = new CountDownLatch(1); - Runnable runnable = new Runnable() { + serverSession.getSessionContext().executeOnCompletion(new IOCallback() { @Override - public void run() { + public void done() { try { consumer.close(false); - latch.countDown(); } catch (Exception e) { + logger.warn(e.getMessage(), e); } } - }; - // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol) - // to avoid deadlocks the close has to be done outside of the main thread on an executor - // otherwise you could get a deadlock - Executor executor = protonSPI.getExeuctor(); + @Override + public void onError(int errorCode, String errorMessage) { + } + }); - if (executor != null) { - executor.execute(runnable); - } else { - runnable.run(); - } - - try { - latch.await(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue()); - } } public String tempQueueName() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 5733713979..af172c8dad 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -260,6 +260,42 @@ public class ConsumerTest extends ActiveMQTestBase { assertEquals(0, server.getTotalMessageCount()); } + @Test + public void testAutoCreateCOnConsumer() throws Throwable { + + final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue"); + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + for (int i = 0; i < 10; i++) { + ConnectionFactory factorySend = createFactory(2); + Connection connection = factorySend.createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(thisQueue.toString()); + MessageProducer producer = session.createProducer(queue); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + producer.send(session.createTextMessage("hello")); + + Assert.assertNotNull(consumer.receive(5000)); + consumer.close(); + session.close(); + } finally { + connection.close(); + } + + Wait.waitFor(() -> server.getAddressInfo(thisQueue) == null, 1000, 10); + assertNull(server.getAddressInfo(thisQueue)); + assertEquals(0, server.getTotalMessageCount()); + } + } + @Test public void testAutoDeleteAutoCreatedAddressAndQueue() throws Throwable { if (!isNetty()) {