This commit is contained in:
Clebert Suconic 2020-04-29 18:00:57 -04:00
commit 38de1f7963
3 changed files with 68 additions and 10 deletions

View File

@ -192,7 +192,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
* mandates createSession to return a XASession.
*/
public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException {
public synchronized Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_GENERIC_CONNECTION);
@ -206,7 +206,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
* mandates createSession to return a XASession.
*/
public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
public synchronized Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_TOPIC_CONNECTION);
@ -220,7 +220,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
* mandates createSession to return a XASession.
*/
public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
public synchronized Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_QUEUE_CONNECTION);
@ -432,14 +432,14 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
}
@Override
public Session createSession(int sessionMode) throws JMSException {
public synchronized Session createSession(int sessionMode) throws JMSException {
checkClosed();
return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, ActiveMQSession.TYPE_GENERIC_SESSION);
}
@Override
public Session createSession() throws JMSException {
public synchronized Session createSession() throws JMSException {
checkClosed();
return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, ActiveMQSession.TYPE_GENERIC_SESSION);
}
@ -447,7 +447,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
// QueueConnection implementation ---------------------------------------------------------------
@Override
public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException {
public synchronized QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_QUEUE_SESSION);
}
@ -477,7 +477,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
// TopicConnection implementation ---------------------------------------------------------------
@Override
public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
public synchronized TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_TOPIC_SESSION);
}

View File

@ -48,20 +48,20 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA
}
@Override
public XASession createXASession() throws JMSException {
public synchronized XASession createXASession() throws JMSException {
checkClosed();
return (XASession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_GENERIC_SESSION);
}
@Override
public XAQueueSession createXAQueueSession() throws JMSException {
public synchronized XAQueueSession createXAQueueSession() throws JMSException {
checkClosed();
return (XAQueueSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_QUEUE_SESSION);
}
@Override
public XATopicSession createXATopicSession() throws JMSException {
public synchronized XATopicSession createXATopicSession() throws JMSException {
checkClosed();
return (XATopicSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION);
}

View File

@ -20,6 +20,7 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
@ -31,15 +32,23 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class ConnectionTest extends JMSTestBase {
private static final Logger log = Logger.getLogger(ConnectionTest.class);
private Connection conn2;
@Test
@ -248,6 +257,55 @@ public class ConnectionTest extends JMSTestBase {
}
}
@Test
public void testCreateSessionAndCloseConnectionConcurrently() throws Exception {
final int ATTEMPTS = 10;
final int THREAD_COUNT = 50;
final int SESSION_COUNT = 10;
final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
for (int i = 0; i < ATTEMPTS; i++) {
final CountDownLatch lineUp = new CountDownLatch(THREAD_COUNT);
final AtomicBoolean error = new AtomicBoolean(false);
final Connection connection = cf.createConnection();
for (int j = 0; j < THREAD_COUNT; ++j) {
executor.execute(() -> {
for (int k = 0; k < SESSION_COUNT; k++) {
try {
connection.createSession().close();
if (k == 0) {
lineUp.countDown();
}
} catch (javax.jms.IllegalStateException e) {
// ignore
break;
} catch (JMSException e) {
// ignore
break;
} catch (Throwable t) {
log.warn(t.getMessage(), t);
error.set(true);
break;
}
}
});
}
// wait until all the threads have created & closed at least 1 session
assertTrue(lineUp.await(10, TimeUnit.SECONDS));
connection.close();
if (error.get()) {
assertFalse(error.get());
}
}
} finally {
executor.shutdownNow();
}
}
@Override
@After
public void tearDown() throws Exception {