diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index f15b0c8830..2e6997dc1c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -246,7 +246,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta this.sessionAsyncDispatch = sessionAsyncDispatch; this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); setTransactionContext(new TransactionContext(connection)); - connection.addSession(this); stats = new JMSSessionStatsImpl(producers, consumers); this.connection.asyncSendPacket(info); setTransformer(connection.getTransformer()); @@ -254,6 +253,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta this.scheduler=connection.getScheduler(); this.connectionExecutor=connection.getExecutor(); this.executor = new ActiveMQSessionExecutor(this); + connection.addSession(this); if (connection.isStarted()) { start(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java index a7a3dc9b4d..cf913bd443 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java @@ -16,6 +16,15 @@ */ package org.apache.activemq; +import java.util.Random; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -105,4 +114,44 @@ public class JmsConnectionStartStopTest extends TestSupport { stoppedConnection.stop(); testStoppedConsumerHoldsMessagesTillStarted(); } + + + public void testConcurrentSessionCreateWithStart() throws Exception { + ThreadPoolExecutor executor = new ThreadPoolExecutor(50, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + final Vector exceptions = new Vector(); + final Random rand = new Random(); + Runnable createSessionTask = new Runnable() { + public void run() { + try { + TimeUnit.MILLISECONDS.sleep(rand.nextInt(10)); + stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (Exception e) { + exceptions.add(e); + } + } + }; + + Runnable startStopTask = new Runnable() { + public void run() { + try { + TimeUnit.MILLISECONDS.sleep(rand.nextInt(10)); + stoppedConnection.start(); + stoppedConnection.stop(); + } catch (Exception e) { + exceptions.add(e); + } + } + }; + + for (int i=0; i<1000; i++) { + executor.execute(createSessionTask); + executor.execute(startStopTask); + } + + executor.shutdown(); + assertTrue("executor terminated", executor.awaitTermination(30, TimeUnit.SECONDS)); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } }