diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index 9241ddf94b..b3026ad030 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -192,7 +192,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme * For that reason we have this method to force that nonXASession, since the JMS Javadoc * mandates createSession to return a XASession. */ - public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException { + public synchronized Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_GENERIC_CONNECTION); @@ -206,7 +206,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme * For that reason we have this method to force that nonXASession, since the JMS Javadoc * mandates createSession to return a XASession. */ - public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException { + public synchronized Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_TOPIC_CONNECTION); @@ -220,7 +220,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme * For that reason we have this method to force that nonXASession, since the JMS Javadoc * mandates createSession to return a XASession. */ - public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException { + public synchronized Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_QUEUE_CONNECTION); @@ -432,14 +432,14 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme } @Override - public Session createSession(int sessionMode) throws JMSException { + public synchronized Session createSession(int sessionMode) throws JMSException { checkClosed(); return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, ActiveMQSession.TYPE_GENERIC_SESSION); } @Override - public Session createSession() throws JMSException { + public synchronized Session createSession() throws JMSException { checkClosed(); return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, ActiveMQSession.TYPE_GENERIC_SESSION); } @@ -447,7 +447,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme // QueueConnection implementation --------------------------------------------------------------- @Override - public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException { + public synchronized QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_QUEUE_SESSION); } @@ -477,7 +477,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme // TopicConnection implementation --------------------------------------------------------------- @Override - public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException { + public synchronized TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_TOPIC_SESSION); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java index 0d6158ed52..c5eea98421 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java @@ -48,20 +48,20 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA } @Override - public XASession createXASession() throws JMSException { + public synchronized XASession createXASession() throws JMSException { checkClosed(); return (XASession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_GENERIC_SESSION); } @Override - public XAQueueSession createXAQueueSession() throws JMSException { + public synchronized XAQueueSession createXAQueueSession() throws JMSException { checkClosed(); return (XAQueueSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_QUEUE_SESSION); } @Override - public XATopicSession createXATopicSession() throws JMSException { + public synchronized XATopicSession createXATopicSession() throws JMSException { checkClosed(); return (XATopicSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java index 259162e713..d1ed5badb2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java @@ -20,6 +20,7 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.InvalidClientIDException; import javax.jms.JMSContext; +import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.Session; @@ -31,15 +32,23 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.jboss.logging.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Test; public class ConnectionTest extends JMSTestBase { + private static final Logger log = Logger.getLogger(ConnectionTest.class); + private Connection conn2; @Test @@ -248,6 +257,55 @@ public class ConnectionTest extends JMSTestBase { } } + @Test + public void testCreateSessionAndCloseConnectionConcurrently() throws Exception { + final int ATTEMPTS = 10; + final int THREAD_COUNT = 50; + final int SESSION_COUNT = 10; + final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + try { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); + + for (int i = 0; i < ATTEMPTS; i++) { + final CountDownLatch lineUp = new CountDownLatch(THREAD_COUNT); + final AtomicBoolean error = new AtomicBoolean(false); + final Connection connection = cf.createConnection(); + + for (int j = 0; j < THREAD_COUNT; ++j) { + executor.execute(() -> { + for (int k = 0; k < SESSION_COUNT; k++) { + try { + connection.createSession().close(); + if (k == 0) { + lineUp.countDown(); + } + } catch (javax.jms.IllegalStateException e) { + // ignore + break; + } catch (JMSException e) { + // ignore + break; + } catch (Throwable t) { + log.warn(t.getMessage(), t); + error.set(true); + break; + } + } + }); + } + + // wait until all the threads have created & closed at least 1 session + assertTrue(lineUp.await(10, TimeUnit.SECONDS)); + connection.close(); + if (error.get()) { + assertFalse(error.get()); + } + } + } finally { + executor.shutdownNow(); + } + } + @Override @After public void tearDown() throws Exception {