This commit is contained in:
Dejan Bosanac 2014-02-21 11:22:00 +01:00
parent dcbac84a8c
commit c6fe94ec03
4 changed files with 137 additions and 21 deletions

View File

@ -144,9 +144,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
}
Command command = (Command) o;
Response response = service(command);
if (response != null && !brokerService.isStopping() ) {
dispatchSync(response);
if (!brokerService.isStopping()) {
Response response = service(command);
if (response != null && !brokerService.isStopping()) {
dispatchSync(response);
}
} else {
throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
}
} finally {
serviceLock.readLock().unlock();

View File

@ -216,8 +216,12 @@ public class TransportConnector implements Connector, BrokerServiceAware {
@Override
public void run() {
try {
Connection connection = createConnection(transport);
connection.start();
if (!brokerService.isStopping()) {
Connection connection = createConnection(transport);
connection.start();
} else {
throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
}
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);

View File

@ -18,13 +18,9 @@ package org.apache.activemq.xbean;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.*;
import junit.framework.TestCase;
@ -36,6 +32,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -86,13 +83,116 @@ public class ConnectorXBeanConfigTest extends TestCase {
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Destination dest = new ActiveMQQueue("test");
MessageProducer producer = sess.createProducer(dest);
MessageConsumer consumer = sess.createConsumer(dest);
MessageProducer producer = sess.createProducer(dest);
producer.send(sess.createTextMessage("test"));
TextMessage msg = (TextMessage)consumer.receive(1000);
assertEquals("test", msg.getText());
}
public void testBrokerWontStop() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?async=false");
factory.setDispatchAsync(false);
factory.setAlwaysSessionAsync(false);
Connection conn = factory.createConnection();
final Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
conn.start();
final Destination dest = new ActiveMQQueue("TEST");
final CountDownLatch stop = new CountDownLatch(1);
final CountDownLatch sendSecond = new CountDownLatch(1);
final CountDownLatch shutdown = new CountDownLatch(1);
final CountDownLatch test = new CountDownLatch(1);
ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory("vm://localhost?async=false");
Connection testConn = testFactory.createConnection();
testConn.start();
Destination testDestination = sess.createQueue("NEW");
Session testSess = testConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer testProducer = testSess.createProducer(testDestination);
final Thread consumerThread = new Thread() {
@Override
public void run() {
try {
MessageProducer producer = sess.createProducer(dest);
producer.send(sess.createTextMessage("msg1"));
MessageConsumer consumer = sess.createConsumer(dest);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
// send a message that will block
Thread.sleep(2000);
sendSecond.countDown();
// try to stop the broker
Thread.sleep(5000);
stop.countDown();
// run the test
Thread.sleep(5000);
test.countDown();
shutdown.await();
} catch (InterruptedException ie) {
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
};
consumerThread.start();
final Thread producerThread = new Thread() {
@Override
public void run() {
try {
sendSecond.await();
MessageProducer producer = sess.createProducer(dest);
producer.send(sess.createTextMessage("msg2"));
} catch (Exception e) {
e.printStackTrace();
}
}
};
producerThread.start();
final Thread stopThread = new Thread() {
@Override
public void run() {
try {
stop.await();
brokerService.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
};
stopThread.start();
test.await();
try {
testSess.createConsumer(testDestination);
fail("Should have failed creating a consumer!");
} catch (Exception e) {
e.printStackTrace();
}
try {
testProducer.send(testSess.createTextMessage("msg3"));
fail("Should have failed sending a message!");
} catch (Exception e) {
e.printStackTrace();
}
shutdown.countDown();
}
@Override
protected void setUp() throws Exception {
brokerService = createBroker();

View File

@ -27,19 +27,27 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" persistent="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb" optimizedDispatch="true">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<networkConnectors>
<networkConnector uri="static://(tcp://localhost:61616)">
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>