This commit is contained in:
Justin Bertram 2017-10-31 08:37:40 -05:00
commit de69673fc0
2 changed files with 43 additions and 23 deletions

View File

@ -16,9 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
@ -45,7 +43,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@ -317,35 +314,22 @@ public class AMQPSessionCallback implements SessionCallback {
public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = new Runnable() {
serverSession.getSessionContext().executeOnCompletion(new IOCallback() {
@Override
public void run() {
public void done() {
try {
consumer.close(false);
latch.countDown();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
};
// Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
// to avoid deadlocks the close has to be done outside of the main thread on an executor
// otherwise you could get a deadlock
Executor executor = protonSPI.getExeuctor();
@Override
public void onError(int errorCode, String errorMessage) {
}
});
if (executor != null) {
executor.execute(runnable);
} else {
runnable.run();
}
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
}
}
public String tempQueueName() {

View File

@ -260,6 +260,42 @@ public class ConsumerTest extends ActiveMQTestBase {
assertEquals(0, server.getTotalMessageCount());
}
@Test
public void testAutoCreateCOnConsumer() throws Throwable {
final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue");
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
for (int i = 0; i < 10; i++) {
ConnectionFactory factorySend = createFactory(2);
Connection connection = factorySend.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(thisQueue.toString());
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
producer.send(session.createTextMessage("hello"));
Assert.assertNotNull(consumer.receive(5000));
consumer.close();
session.close();
} finally {
connection.close();
}
Wait.waitFor(() -> server.getAddressInfo(thisQueue) == null, 1000, 10);
assertNull(server.getAddressInfo(thisQueue));
assertEquals(0, server.getTotalMessageCount());
}
}
@Test
public void testAutoDeleteAutoCreatedAddressAndQueue() throws Throwable {
if (!isNetty()) {