AMQ-9193 - Improve broker shutdown in unit tests

This should improve test reliability for the unit tests so brokers don't
hang around after the end of a test on error. Also increase the surefire
re-run count to 3 times before failing.
This commit is contained in:
Christopher L. Shannon (cshannon) 2023-01-11 06:21:21 -05:00
parent 05ffe8aca0
commit a083ff4d23
12 changed files with 157 additions and 107 deletions

2
Jenkinsfile vendored
View File

@ -99,7 +99,7 @@ pipeline {
echo 'Running tests' echo 'Running tests'
// all tests is very very long (10 hours on Apache Jenkins) // all tests is very very long (10 hours on Apache Jenkins)
// sh 'mvn -B -e test -pl activemq-unit-tests -Dactivemq.tests=all' // sh 'mvn -B -e test -pl activemq-unit-tests -Dactivemq.tests=all'
sh 'mvn -B -e -fae test -Dsurefire.rerunFailingTestsCount=2' sh 'mvn -B -e -fae test -Dsurefire.rerunFailingTestsCount=3'
} }
post { post {
always { always {

View File

@ -467,102 +467,114 @@ public class MDBTest {
public void testErrorOnNoMessageDeliveryBrokerZeroPrefetchConfig() throws Exception { public void testErrorOnNoMessageDeliveryBrokerZeroPrefetchConfig() throws Exception {
final BrokerService brokerService = new BrokerService(); final BrokerService brokerService = new BrokerService();
final String brokerUrl = "vm://zeroPrefetch?create=false";
brokerService.setBrokerName("zeroPrefetch");
brokerService.setPersistent(false);
PolicyMap policyMap = new PolicyMap();
PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
zeroPrefetchPolicy.setQueuePrefetch(0);
policyMap.setDefaultEntry(zeroPrefetchPolicy);
brokerService.setDestinationPolicy(policyMap);
brokerService.start();
final AtomicReference<String> errorMessage = new AtomicReference<String>(); try {
final var appender = new AbstractAppender("test", new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) { final String brokerUrl = "vm://zeroPrefetch?create=false";
@Override brokerService.setBrokerName("zeroPrefetch");
public void append(LogEvent event) { brokerService.setPersistent(false);
if (event.getLevel().isMoreSpecificThan(Level.ERROR)) { PolicyMap policyMap = new PolicyMap();
System.err.println("Event :" + event.getMessage().getFormattedMessage()); PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
errorMessage.set(event.getMessage().getFormattedMessage()); zeroPrefetchPolicy.setQueuePrefetch(0);
policyMap.setDefaultEntry(zeroPrefetchPolicy);
brokerService.setDestinationPolicy(policyMap);
brokerService.start();
final AtomicReference<String> errorMessage = new AtomicReference<String>();
final var appender = new AbstractAppender("test", new AbstractFilter() {
}, new MessageLayout(), false, new Property[0]) {
@Override
public void append(LogEvent event) {
if (event.getLevel().isMoreSpecificThan(Level.ERROR)) {
System.err.println("Event :" + event.getMessage().getFormattedMessage());
errorMessage.set(event.getMessage().getFormattedMessage());
}
} }
}
};
appender.start();
final var logger = org.apache.logging.log4j.core.Logger.class.cast(LogManager.getRootLogger());
logger.addAppender(appender);
logger.get().addAppender(appender, Level.INFO, new AbstractFilter() {});
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisory = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQQueue("TEST")));
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl(brokerUrl);
adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
}; };
}; appender.start();
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); final var logger = org.apache.logging.log4j.core.Logger.class.cast(
activationSpec.setDestinationType(Queue.class.getName()); LogManager.getRootLogger());
activationSpec.setDestination("TEST"); logger.addAppender(appender);
activationSpec.setResourceAdapter(adapter); logger.get().addAppender(appender, Level.INFO, new AbstractFilter() {
activationSpec.validate(); });
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
@Override Connection connection = factory.createConnection();
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { connection.start();
endpoint.xaresource = resource; Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return endpoint;
MessageConsumer advisory = session.createConsumer(
AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQQueue("TEST")));
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl(brokerUrl);
adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
}
;
};
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setDestination("TEST");
activationSpec.setResourceAdapter(adapter);
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource)
throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
};
// Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec);
ActiveMQMessage msg = (ActiveMQMessage) advisory.receive(4000);
if (msg != null) {
assertEquals("Prefetch size hasn't been set", 0,
((ConsumerInfo) msg.getDataStructure()).getPrefetchSize());
} else {
fail("Consumer hasn't been created");
} }
@Override // Send the broker a message to that endpoint
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
return true; producer.send(session.createTextMessage("Hello!"));
}
};
// Activate an Endpoint connection.close();
adapter.endpointActivation(messageEndpointFactory, activationSpec);
ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(4000); // Wait for the message to be delivered.
if (msg != null) { assertFalse(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
assertEquals("Prefetch size hasn't been set", 0, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
} else { // Shut the Endpoint down.
fail("Consumer hasn't been created"); adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
assertNotNull("We got an error message", errorMessage.get());
assertTrue("correct message: " + errorMessage.get(),
errorMessage.get().contains("zero"));
logger.removeAppender(appender);
logger.get().removeAppender("test");
} finally {
brokerService.stop();
} }
// Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
producer.send(session.createTextMessage("Hello!"));
connection.close();
// Wait for the message to be delivered.
assertFalse(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
assertNotNull("We got an error message", errorMessage.get());
assertTrue("correct message: " + errorMessage.get(), errorMessage.get().contains("zero"));
logger.removeAppender(appender);
logger.get().removeAppender("test");
brokerService.stop();
} }
@Test @Test

View File

@ -99,8 +99,12 @@ public class ExpiredAckAsyncConsumerTest {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
connectionConsumer.close(); try {
connection.close(); connectionConsumer.close();
connection.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
}
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
} }

View File

@ -63,7 +63,11 @@ public class VirtualTopicWildcardTest {
@After @After
public void afer() throws Exception { public void afer() throws Exception {
connection.close(); try {
connection.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
}
brokerService.stop(); brokerService.stop();
} }

View File

@ -105,8 +105,12 @@ public class AMQ2801Test
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
conn1.close(); try {
conn2.close(); conn1.close();
conn2.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
}
if (broker != null) { if (broker != null) {
broker.stop(); broker.stop();
} }

View File

@ -72,12 +72,16 @@ public class AMQ3145Test {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
if (consumer != null) { try {
consumer.close(); if (consumer != null) {
consumer.close();
}
session.close();
connection.stop();
connection.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
} }
session.close();
connection.stop();
connection.close();
broker.stop(); broker.stop();
} }

View File

@ -71,7 +71,11 @@ public class AMQ3732Test {
@After @After
public void stopBroker() throws Exception { public void stopBroker() throws Exception {
connection.close(); try {
connection.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
}
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();

View File

@ -65,8 +65,10 @@ public class AMQ6815Test {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
if (connection != null) { try {
connection.close(); connection.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
} }
brokerService.stop(); brokerService.stop();
} }

View File

@ -68,8 +68,12 @@ public class OutOfOrderTestCase extends TestCase {
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
session.close(); try {
connection.close(); session.close();
connection.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
}
brokerService.stop(); brokerService.stop();
} }

View File

@ -83,7 +83,11 @@ public class ProxyTestSupport extends BrokerTestSupport {
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
for (Iterator<StubConnection> iter = connections.iterator(); iter.hasNext();) { for (Iterator<StubConnection> iter = connections.iterator(); iter.hasNext();) {
StubConnection connection = iter.next(); StubConnection connection = iter.next();
connection.stop(); try {
connection.stop();
} catch (Exception e) {
//swallow any error so broker can still be stopped
}
iter.remove(); iter.remove();
} }
remoteBroker.stop(); remoteBroker.stop();

View File

@ -149,7 +149,11 @@ public class JMXRemoveQueueThenSendIgnoredTest {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
connection.close(); try {
connection.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
}
brokerService.stop(); brokerService.stop();
} }
} }

View File

@ -285,10 +285,14 @@ public class TopicSubscriptionZeroPrefetchTest {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
consumer.close(); try {
producer.close(); consumer.close();
session.close(); producer.close();
connection.close(); session.close();
connection.close();
} catch (Exception e) {
//swallow any error so broker can still be stopped
}
brokerService.stop(); brokerService.stop();
} }