From e81453e6608ce70436576f1ed54b1f62d30ddc2e Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 9 Nov 2018 11:38:28 -0600 Subject: [PATCH] ARTEMIS-2140 queue creation race w/AMQP shared subs --- .../amqp/broker/AMQPSessionCallback.java | 2 ++ .../amqp/JMSMessageConsumerTest.java | 31 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 14c1042cd0..1ca4410a4d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -284,6 +284,8 @@ public class AMQPSessionCallback implements SessionCallback { serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true); } catch (ActiveMQSecurityException se) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage()); + } catch (ActiveMQQueueExistsException e) { + // ignore as may be caused by multiple, concurrent clients } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java index 0d8fddee9a..485d886d92 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -21,7 +21,10 @@ import java.util.Enumeration; import java.util.Random; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -35,6 +38,7 @@ import javax.jms.MessageProducer; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.tests.util.Wait; @@ -836,4 +840,31 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { connection.close(); } } + + @Test + public void testConcurrentSharedConsumerConnections() throws Exception { + final int concurrentConnections = 20; + final ExecutorService executorService = Executors.newFixedThreadPool(concurrentConnections); + + final AtomicBoolean failedToSubscribe = new AtomicBoolean(false); + for (int i = 1; i < concurrentConnections; i++) { + executorService.submit(() -> { + try (Connection connection = createConnection()) { + connection.start(); + @SuppressWarnings("resource") + final Session session = connection.createSession(); + final Topic topic = session.createTopic("topics.foo"); + session.createSharedConsumer(topic, "MY_SUB"); + Thread.sleep(100); + } catch (final Exception ex) { + ex.printStackTrace(); + failedToSubscribe.set(true); + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.SECONDS); + + assertFalse(failedToSubscribe.get()); + } }