mirror of https://github.com/apache/activemq.git
ensure broker stop ocurrs after subs connected and add defensive check to close identified by one failure scenario
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1296396 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d2f8eded57
commit
ac54a611f3
|
@ -964,7 +964,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
// so that in progress operations can notice and unblock.
|
// so that in progress operations can notice and unblock.
|
||||||
List<TransportConnectionState> connectionStates = listConnectionStates();
|
List<TransportConnectionState> connectionStates = listConnectionStates();
|
||||||
for (TransportConnectionState cs : connectionStates) {
|
for (TransportConnectionState cs : connectionStates) {
|
||||||
cs.getContext().getStopping().set(true);
|
ConnectionContext connectionContext = cs.getContext();
|
||||||
|
if (connectionContext != null) {
|
||||||
|
connectionContext.getStopping().set(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
|
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
|
||||||
|
|
|
@ -172,8 +172,9 @@ public class DurableConsumerTest extends CombinationTestSupport{
|
||||||
|
|
||||||
Thread publisherThread = new Thread(new MessagePublisher());
|
Thread publisherThread = new Thread(new MessagePublisher());
|
||||||
publisherThread.start();
|
publisherThread.start();
|
||||||
final List<SimpleTopicSubscriber> list = new ArrayList<SimpleTopicSubscriber>();
|
final int numSubs = 100;
|
||||||
for (int i = 0; i < 100; i++) {
|
final List<SimpleTopicSubscriber> list = new ArrayList<SimpleTopicSubscriber>(numSubs);
|
||||||
|
for (int i = 0; i < numSubs; i++) {
|
||||||
|
|
||||||
final int id = i;
|
final int id = i;
|
||||||
Thread thread = new Thread(new Runnable(){
|
Thread thread = new Thread(new Runnable(){
|
||||||
|
@ -186,7 +187,13 @@ public class DurableConsumerTest extends CombinationTestSupport{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(5000);
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return numSubs == list.size();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker = createBroker(false);
|
broker = createBroker(false);
|
||||||
configurePersistence(broker);
|
configurePersistence(broker);
|
||||||
|
@ -195,7 +202,7 @@ public class DurableConsumerTest extends CombinationTestSupport{
|
||||||
for (SimpleTopicSubscriber s:list) {
|
for (SimpleTopicSubscriber s:list) {
|
||||||
s.closeConnection();
|
s.closeConnection();
|
||||||
}
|
}
|
||||||
assertEquals(0, exceptions.size());
|
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
// makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
|
// makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
|
||||||
|
|
Loading…
Reference in New Issue