git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@987113 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-08-19 10:09:36 +00:00
parent 5b8bcf784e
commit c62b951c52
2 changed files with 50 additions and 1 deletions

View File

@ -246,7 +246,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
this.sessionAsyncDispatch = sessionAsyncDispatch; this.sessionAsyncDispatch = sessionAsyncDispatch;
this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
setTransactionContext(new TransactionContext(connection)); setTransactionContext(new TransactionContext(connection));
connection.addSession(this);
stats = new JMSSessionStatsImpl(producers, consumers); stats = new JMSSessionStatsImpl(producers, consumers);
this.connection.asyncSendPacket(info); this.connection.asyncSendPacket(info);
setTransformer(connection.getTransformer()); setTransformer(connection.getTransformer());
@ -254,6 +253,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
this.scheduler=connection.getScheduler(); this.scheduler=connection.getScheduler();
this.connectionExecutor=connection.getExecutor(); this.connectionExecutor=connection.getExecutor();
this.executor = new ActiveMQSessionExecutor(this); this.executor = new ActiveMQSessionExecutor(this);
connection.addSession(this);
if (connection.isStarted()) { if (connection.isStarted()) {
start(); start();
} }

View File

@ -16,6 +16,15 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -105,4 +114,44 @@ public class JmsConnectionStartStopTest extends TestSupport {
stoppedConnection.stop(); stoppedConnection.stop();
testStoppedConsumerHoldsMessagesTillStarted(); testStoppedConsumerHoldsMessagesTillStarted();
} }
public void testConcurrentSessionCreateWithStart() throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
final Vector<Throwable> exceptions = new Vector<Throwable>();
final Random rand = new Random();
Runnable createSessionTask = new Runnable() {
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
exceptions.add(e);
}
}
};
Runnable startStopTask = new Runnable() {
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
stoppedConnection.start();
stoppedConnection.stop();
} catch (Exception e) {
exceptions.add(e);
}
}
};
for (int i=0; i<1000; i++) {
executor.execute(createSessionTask);
executor.execute(startStopTask);
}
executor.shutdown();
assertTrue("executor terminated", executor.awaitTermination(30, TimeUnit.SECONDS));
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
}
} }