From 2e53d8f5fbf60da0aedc53e9da77f7579f301cad Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 25 Sep 2018 16:41:27 -0500 Subject: [PATCH] ARTEMIS-1929 race in STOMP w/identical durable subs --- .../core/protocol/stomp/StompSession.java | 7 +- .../integration/stomp/v12/StompV12Test.java | 83 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 291634f618..b355168915 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; @@ -268,7 +269,11 @@ public class StompSession implements SessionCallback { } queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName); if (manager.getServer().locateQueue(queueName) == null) { - session.createQueue(address, queueName, selectorSimple, false, true); + try { + session.createQueue(address, queueName, selectorSimple, false, true); + } catch (ActiveMQQueueExistsException e) { + // ignore; can be caused by concurrent durable subscribers + } } } else { queueName = UUIDGenerator.getInstance().generateSimpleStringUUID(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java index 61ef22c5ac..e3b61b3841 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java @@ -32,10 +32,12 @@ import java.util.Collection; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; @@ -1451,6 +1453,87 @@ public class StompV12Test extends StompTestBase { Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); } + @Test + public void testMultipleDurableSubscribers() throws Exception { + org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE); + conn.connect(defUser, defPass, "myClientID"); + StompClientConnectionV12 conn2 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri); + conn2.connect(defUser, defPass, "myClientID"); + + subscribe(conn, UUID.randomUUID().toString(), "client-individual", getName()); + subscribe(conn2, UUID.randomUUID().toString(), "clientindividual", getName()); + + conn.closeTransport(); + waitDisconnect(conn); + conn2.closeTransport(); + waitDisconnect(conn2); + } + + @Test + public void testMultipleConcurrentDurableSubscribers() throws Exception { + org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE); + + int NUMBER_OF_THREADS = 25; + SubscriberThread[] threads = new SubscriberThread[NUMBER_OF_THREADS]; + final CountDownLatch startFlag = new CountDownLatch(1); + final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS); + + for (int i = 0; i < threads.length; i++) { + threads[i] = new SubscriberThread("subscriber::" + i, StompClientConnectionFactory.createClientConnection(uri), startFlag, alignFlag); + } + + for (SubscriberThread t : threads) { + t.start(); + } + + alignFlag.await(); + + startFlag.countDown(); + + for (SubscriberThread t : threads) { + t.join(); + Assert.assertEquals(0, t.errors.get()); + } + } + + class SubscriberThread extends Thread { + final StompClientConnection connection; + final CountDownLatch startFlag; + final CountDownLatch alignFlag; + final AtomicInteger errors = new AtomicInteger(0); + + SubscriberThread(String name, StompClientConnection connection, CountDownLatch startFlag, CountDownLatch alignFlag) { + super(name); + this.connection = connection; + this.startFlag = startFlag; + this.alignFlag = alignFlag; + } + + @Override + public void run() { + try { + alignFlag.countDown(); + startFlag.await(); + connection.connect(defUser, defPass, "myClientID"); + ClientStompFrame frame = subscribeTopic(connection, UUID.randomUUID().toString(), "client-individual", "123"); + if (frame.getCommand().equals(Stomp.Responses.ERROR)) { + + errors.incrementAndGet(); + } + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } finally { + try { + connection.disconnect(); + waitDisconnect((StompClientConnectionV12) connection); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + @Test public void testDurableSubscriberWithReconnection() throws Exception { conn.connect(defUser, defPass, CLIENT_ID);