ARTEMIS-2140 queue creation race w/AMQP shared subs

This commit is contained in:
Justin Bertram 2018-11-09 11:38:28 -06:00 committed by Clebert Suconic
parent 29062935a5
commit e81453e660
2 changed files with 33 additions and 0 deletions

View File

@ -284,6 +284,8 @@ public class AMQPSessionCallback implements SessionCallback {
serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
} catch (ActiveMQSecurityException se) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
} catch (ActiveMQQueueExistsException e) {
// ignore as may be caused by multiple, concurrent clients
}
}

View File

@ -21,7 +21,10 @@ import java.util.Enumeration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@ -35,6 +38,7 @@ import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.Wait;
@ -836,4 +840,31 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
connection.close();
}
}
@Test
public void testConcurrentSharedConsumerConnections() throws Exception {
final int concurrentConnections = 20;
final ExecutorService executorService = Executors.newFixedThreadPool(concurrentConnections);
final AtomicBoolean failedToSubscribe = new AtomicBoolean(false);
for (int i = 1; i < concurrentConnections; i++) {
executorService.submit(() -> {
try (Connection connection = createConnection()) {
connection.start();
@SuppressWarnings("resource")
final Session session = connection.createSession();
final Topic topic = session.createTopic("topics.foo");
session.createSharedConsumer(topic, "MY_SUB");
Thread.sleep(100);
} catch (final Exception ex) {
ex.printStackTrace();
failedToSubscribe.set(true);
}
});
}
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
assertFalse(failedToSubscribe.get());
}
}