This closes #2385
This commit is contained in:
commit
638ff75f4b
|
@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
|
@ -268,7 +269,11 @@ public class StompSession implements SessionCallback {
|
||||||
}
|
}
|
||||||
queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
|
queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
|
||||||
if (manager.getServer().locateQueue(queueName) == null) {
|
if (manager.getServer().locateQueue(queueName) == null) {
|
||||||
session.createQueue(address, queueName, selectorSimple, false, true);
|
try {
|
||||||
|
session.createQueue(address, queueName, selectorSimple, false, true);
|
||||||
|
} catch (ActiveMQQueueExistsException e) {
|
||||||
|
// ignore; can be caused by concurrent durable subscribers
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
|
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
|
||||||
|
|
|
@ -32,10 +32,12 @@ import java.util.Collection;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
|
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
|
||||||
|
@ -1451,6 +1453,87 @@ public class StompV12Test extends StompTestBase {
|
||||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected());
|
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleDurableSubscribers() throws Exception {
|
||||||
|
org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
|
||||||
|
conn.connect(defUser, defPass, "myClientID");
|
||||||
|
StompClientConnectionV12 conn2 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
|
||||||
|
conn2.connect(defUser, defPass, "myClientID");
|
||||||
|
|
||||||
|
subscribe(conn, UUID.randomUUID().toString(), "client-individual", getName());
|
||||||
|
subscribe(conn2, UUID.randomUUID().toString(), "clientindividual", getName());
|
||||||
|
|
||||||
|
conn.closeTransport();
|
||||||
|
waitDisconnect(conn);
|
||||||
|
conn2.closeTransport();
|
||||||
|
waitDisconnect(conn2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleConcurrentDurableSubscribers() throws Exception {
|
||||||
|
org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
|
||||||
|
|
||||||
|
int NUMBER_OF_THREADS = 25;
|
||||||
|
SubscriberThread[] threads = new SubscriberThread[NUMBER_OF_THREADS];
|
||||||
|
final CountDownLatch startFlag = new CountDownLatch(1);
|
||||||
|
final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
|
||||||
|
|
||||||
|
for (int i = 0; i < threads.length; i++) {
|
||||||
|
threads[i] = new SubscriberThread("subscriber::" + i, StompClientConnectionFactory.createClientConnection(uri), startFlag, alignFlag);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (SubscriberThread t : threads) {
|
||||||
|
t.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
alignFlag.await();
|
||||||
|
|
||||||
|
startFlag.countDown();
|
||||||
|
|
||||||
|
for (SubscriberThread t : threads) {
|
||||||
|
t.join();
|
||||||
|
Assert.assertEquals(0, t.errors.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class SubscriberThread extends Thread {
|
||||||
|
final StompClientConnection connection;
|
||||||
|
final CountDownLatch startFlag;
|
||||||
|
final CountDownLatch alignFlag;
|
||||||
|
final AtomicInteger errors = new AtomicInteger(0);
|
||||||
|
|
||||||
|
SubscriberThread(String name, StompClientConnection connection, CountDownLatch startFlag, CountDownLatch alignFlag) {
|
||||||
|
super(name);
|
||||||
|
this.connection = connection;
|
||||||
|
this.startFlag = startFlag;
|
||||||
|
this.alignFlag = alignFlag;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
alignFlag.countDown();
|
||||||
|
startFlag.await();
|
||||||
|
connection.connect(defUser, defPass, "myClientID");
|
||||||
|
ClientStompFrame frame = subscribeTopic(connection, UUID.randomUUID().toString(), "client-individual", "123");
|
||||||
|
if (frame.getCommand().equals(Stomp.Responses.ERROR)) {
|
||||||
|
|
||||||
|
errors.incrementAndGet();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
errors.incrementAndGet();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
connection.disconnect();
|
||||||
|
waitDisconnect((StompClientConnectionV12) connection);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDurableSubscriberWithReconnection() throws Exception {
|
public void testDurableSubscriberWithReconnection() throws Exception {
|
||||||
conn.connect(defUser, defPass, CLIENT_ID);
|
conn.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
Loading…
Reference in New Issue