ARTEMIS-1929 race in STOMP w/identical durable subs

This commit is contained in:
Justin Bertram 2018-09-25 16:41:27 -05:00 committed by Howard Gao
parent 7c5470548a
commit 2e53d8f5fb
2 changed files with 89 additions and 1 deletions

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
@ -268,7 +269,11 @@ public class StompSession implements SessionCallback {
} }
queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName); queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) == null) { if (manager.getServer().locateQueue(queueName) == null) {
session.createQueue(address, queueName, selectorSimple, false, true); try {
session.createQueue(address, queueName, selectorSimple, false, true);
} catch (ActiveMQQueueExistsException e) {
// ignore; can be caused by concurrent durable subscribers
}
} }
} else { } else {
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID(); queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();

View File

@ -32,10 +32,12 @@ import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
@ -1451,6 +1453,87 @@ public class StompV12Test extends StompTestBase {
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected());
} }
@Test
public void testMultipleDurableSubscribers() throws Exception {
org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
conn.connect(defUser, defPass, "myClientID");
StompClientConnectionV12 conn2 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
conn2.connect(defUser, defPass, "myClientID");
subscribe(conn, UUID.randomUUID().toString(), "client-individual", getName());
subscribe(conn2, UUID.randomUUID().toString(), "clientindividual", getName());
conn.closeTransport();
waitDisconnect(conn);
conn2.closeTransport();
waitDisconnect(conn2);
}
@Test
public void testMultipleConcurrentDurableSubscribers() throws Exception {
org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
int NUMBER_OF_THREADS = 25;
SubscriberThread[] threads = new SubscriberThread[NUMBER_OF_THREADS];
final CountDownLatch startFlag = new CountDownLatch(1);
final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
for (int i = 0; i < threads.length; i++) {
threads[i] = new SubscriberThread("subscriber::" + i, StompClientConnectionFactory.createClientConnection(uri), startFlag, alignFlag);
}
for (SubscriberThread t : threads) {
t.start();
}
alignFlag.await();
startFlag.countDown();
for (SubscriberThread t : threads) {
t.join();
Assert.assertEquals(0, t.errors.get());
}
}
class SubscriberThread extends Thread {
final StompClientConnection connection;
final CountDownLatch startFlag;
final CountDownLatch alignFlag;
final AtomicInteger errors = new AtomicInteger(0);
SubscriberThread(String name, StompClientConnection connection, CountDownLatch startFlag, CountDownLatch alignFlag) {
super(name);
this.connection = connection;
this.startFlag = startFlag;
this.alignFlag = alignFlag;
}
@Override
public void run() {
try {
alignFlag.countDown();
startFlag.await();
connection.connect(defUser, defPass, "myClientID");
ClientStompFrame frame = subscribeTopic(connection, UUID.randomUUID().toString(), "client-individual", "123");
if (frame.getCommand().equals(Stomp.Responses.ERROR)) {
errors.incrementAndGet();
}
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
} finally {
try {
connection.disconnect();
waitDisconnect((StompClientConnectionV12) connection);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Test @Test
public void testDurableSubscriberWithReconnection() throws Exception { public void testDurableSubscriberWithReconnection() throws Exception {
conn.connect(defUser, defPass, CLIENT_ID); conn.connect(defUser, defPass, CLIENT_ID);