ARTEMIS-2743 Synchronize JMS connection methods
This commit is contained in:
parent
d0bc946ecb
commit
803ccf7229
|
@ -192,7 +192,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
|
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
|
||||||
* mandates createSession to return a XASession.
|
* mandates createSession to return a XASession.
|
||||||
*/
|
*/
|
||||||
public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException {
|
public synchronized Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
|
|
||||||
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_GENERIC_CONNECTION);
|
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_GENERIC_CONNECTION);
|
||||||
|
@ -206,7 +206,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
|
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
|
||||||
* mandates createSession to return a XASession.
|
* mandates createSession to return a XASession.
|
||||||
*/
|
*/
|
||||||
public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
|
public synchronized Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
|
|
||||||
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_TOPIC_CONNECTION);
|
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_TOPIC_CONNECTION);
|
||||||
|
@ -220,7 +220,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
|
* For that reason we have this method to force that nonXASession, since the JMS Javadoc
|
||||||
* mandates createSession to return a XASession.
|
* mandates createSession to return a XASession.
|
||||||
*/
|
*/
|
||||||
public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
|
public synchronized Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
|
|
||||||
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_QUEUE_CONNECTION);
|
return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_QUEUE_CONNECTION);
|
||||||
|
@ -432,14 +432,14 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Session createSession(int sessionMode) throws JMSException {
|
public synchronized Session createSession(int sessionMode) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, ActiveMQSession.TYPE_GENERIC_SESSION);
|
return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, ActiveMQSession.TYPE_GENERIC_SESSION);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Session createSession() throws JMSException {
|
public synchronized Session createSession() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, ActiveMQSession.TYPE_GENERIC_SESSION);
|
return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, ActiveMQSession.TYPE_GENERIC_SESSION);
|
||||||
}
|
}
|
||||||
|
@ -447,7 +447,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
// QueueConnection implementation ---------------------------------------------------------------
|
// QueueConnection implementation ---------------------------------------------------------------
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException {
|
public synchronized QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_QUEUE_SESSION);
|
return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_QUEUE_SESSION);
|
||||||
}
|
}
|
||||||
|
@ -477,7 +477,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
// TopicConnection implementation ---------------------------------------------------------------
|
// TopicConnection implementation ---------------------------------------------------------------
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
|
public synchronized TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_TOPIC_SESSION);
|
return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_TOPIC_SESSION);
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,20 +48,20 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XASession createXASession() throws JMSException {
|
public synchronized XASession createXASession() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return (XASession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_GENERIC_SESSION);
|
return (XASession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_GENERIC_SESSION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XAQueueSession createXAQueueSession() throws JMSException {
|
public synchronized XAQueueSession createXAQueueSession() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return (XAQueueSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_QUEUE_SESSION);
|
return (XAQueueSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_QUEUE_SESSION);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XATopicSession createXATopicSession() throws JMSException {
|
public synchronized XATopicSession createXATopicSession() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return (XATopicSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION);
|
return (XATopicSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
import javax.jms.InvalidClientIDException;
|
import javax.jms.InvalidClientIDException;
|
||||||
import javax.jms.JMSContext;
|
import javax.jms.JMSContext;
|
||||||
|
import javax.jms.JMSException;
|
||||||
import javax.jms.QueueConnection;
|
import javax.jms.QueueConnection;
|
||||||
import javax.jms.QueueSession;
|
import javax.jms.QueueSession;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
@ -31,15 +32,23 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.ObjectInputStream;
|
import java.io.ObjectInputStream;
|
||||||
import java.io.ObjectOutputStream;
|
import java.io.ObjectOutputStream;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class ConnectionTest extends JMSTestBase {
|
public class ConnectionTest extends JMSTestBase {
|
||||||
|
|
||||||
|
private static final Logger log = Logger.getLogger(ConnectionTest.class);
|
||||||
|
|
||||||
private Connection conn2;
|
private Connection conn2;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -248,6 +257,55 @@ public class ConnectionTest extends JMSTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateSessionAndCloseConnectionConcurrently() throws Exception {
|
||||||
|
final int ATTEMPTS = 10;
|
||||||
|
final int THREAD_COUNT = 50;
|
||||||
|
final int SESSION_COUNT = 10;
|
||||||
|
final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
|
||||||
|
|
||||||
|
for (int i = 0; i < ATTEMPTS; i++) {
|
||||||
|
final CountDownLatch lineUp = new CountDownLatch(THREAD_COUNT);
|
||||||
|
final AtomicBoolean error = new AtomicBoolean(false);
|
||||||
|
final Connection connection = cf.createConnection();
|
||||||
|
|
||||||
|
for (int j = 0; j < THREAD_COUNT; ++j) {
|
||||||
|
executor.execute(() -> {
|
||||||
|
for (int k = 0; k < SESSION_COUNT; k++) {
|
||||||
|
try {
|
||||||
|
connection.createSession().close();
|
||||||
|
if (k == 0) {
|
||||||
|
lineUp.countDown();
|
||||||
|
}
|
||||||
|
} catch (javax.jms.IllegalStateException e) {
|
||||||
|
// ignore
|
||||||
|
break;
|
||||||
|
} catch (JMSException e) {
|
||||||
|
// ignore
|
||||||
|
break;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.warn(t.getMessage(), t);
|
||||||
|
error.set(true);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait until all the threads have created & closed at least 1 session
|
||||||
|
assertTrue(lineUp.await(10, TimeUnit.SECONDS));
|
||||||
|
connection.close();
|
||||||
|
if (error.get()) {
|
||||||
|
assertFalse(error.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
executor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue