From c10e6fa8f021921f67ec309b9e4030f8c89ce01d Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 7 May 2015 18:30:02 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5621 Convert to JUnit 4 while fixing sporadic failure due to the expiry setting being left at default of 30 seconds, which on a fast machine means the expiry tests sometime finish before the task kicks in. --- .../AdvisoryTempDestinationTests.java | 51 +++++++------ .../activemq/advisory/AdvisoryTests.java | 72 +++++++++++-------- 2 files changed, 71 insertions(+), 52 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java index 5e20f79c3b..17044b2dfd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.advisory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + import java.util.ArrayList; import java.util.List; @@ -31,8 +35,6 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.Topic; -import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -41,16 +43,21 @@ import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; -public class AdvisoryTempDestinationTests extends TestCase { +public class AdvisoryTempDestinationTests { protected static final int MESSAGE_COUNT = 2000; + protected static final int EXPIRE_MESSAGE_PERIOD = 10000; + protected BrokerService broker; protected Connection connection; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected int topicCount; - + @Test(timeout = 60000) public void testNoSlowConsumerAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = s.createTemporaryQueue(); @@ -60,8 +67,8 @@ public class AdvisoryTempDestinationTests extends TestCase { public void onMessage(Message message) { } }); - Topic advisoryTopic = AdvisorySupport - .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); + + Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); // start throwing messages at the consumer @@ -75,14 +82,14 @@ public class AdvisoryTempDestinationTests extends TestCase { assertNull(msg); } + @Test(timeout = 60000) public void testSlowConsumerAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = s.createTemporaryQueue(); MessageConsumer consumer = s.createConsumer(queue); assertNotNull(consumer); - Topic advisoryTopic = AdvisorySupport - .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); + Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); // start throwing messages at the consumer @@ -96,6 +103,7 @@ public class AdvisoryTempDestinationTests extends TestCase { assertNotNull(msg); } + @Test(timeout = 60000) public void testMessageDeliveryAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = s.createTemporaryQueue(); @@ -104,7 +112,7 @@ public class AdvisoryTempDestinationTests extends TestCase { Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); - //start throwing messages at the consumer + // start throwing messages at the consumer MessageProducer producer = s.createProducer(queue); BytesMessage m = s.createBytesMessage(); @@ -115,6 +123,7 @@ public class AdvisoryTempDestinationTests extends TestCase { assertNotNull(msg); } + @Test(timeout = 60000) public void testTempMessageConsumedAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = s.createTemporaryQueue(); @@ -122,7 +131,7 @@ public class AdvisoryTempDestinationTests extends TestCase { Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); - //start throwing messages at the consumer + // start throwing messages at the consumer MessageProducer producer = s.createProducer(queue); BytesMessage m = s.createBytesMessage(); @@ -141,6 +150,7 @@ public class AdvisoryTempDestinationTests extends TestCase { assertEquals(originalId, id); } + @Test(timeout = 60000) public void testMessageExpiredAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = s.createQueue(getClass().getName()); @@ -149,7 +159,7 @@ public class AdvisoryTempDestinationTests extends TestCase { Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); - //start throwing messages at the consumer + // start throwing messages at the consumer MessageProducer producer = s.createProducer(queue); producer.setTimeToLive(1); for (int i = 0; i < MESSAGE_COUNT; i++) { @@ -158,34 +168,30 @@ public class AdvisoryTempDestinationTests extends TestCase { producer.send(m); } - Message msg = advisoryConsumer.receive(5000); + Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD); assertNotNull(msg); } - @Override - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { if (broker == null) { broker = createBroker(); } ConnectionFactory factory = createConnectionFactory(); connection = factory.createConnection(); connection.start(); - super.setUp(); } - @Override - protected void tearDown() throws Exception { - super.tearDown(); + @After + public void tearDown() throws Exception { connection.close(); if (broker != null) { broker.stop(); } } - protected ActiveMQConnectionFactory createConnectionFactory() - throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( - ActiveMQConnection.DEFAULT_BROKER_URL); + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); return cf; } @@ -218,6 +224,7 @@ public class AdvisoryTempDestinationTests extends TestCase { private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) { PolicyEntry policy = new PolicyEntry(); + policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD); policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 5e5eb7f14f..07a2daa17a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.advisory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -27,29 +31,34 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; -import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.*; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; /** - * + * Test for advisory messages sent under the right circumstances. */ -public class AdvisoryTests extends TestCase { +public class AdvisoryTests { + protected static final int MESSAGE_COUNT = 2000; protected BrokerService broker; protected Connection connection; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected int topicCount; + protected final int EXPIRE_MESSAGE_PERIOD = 10000; + @Test(timeout = 60000) public void testNoSlowConsumerAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = s.createQueue(getClass().getName()); @@ -59,8 +68,8 @@ public class AdvisoryTests extends TestCase { public void onMessage(Message message) { } }); - Topic advisoryTopic = AdvisorySupport - .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); + + Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); // start throwing messages at the consumer @@ -74,14 +83,14 @@ public class AdvisoryTests extends TestCase { assertNull(msg); } + @Test(timeout = 60000) public void testSlowConsumerAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = s.createQueue(getClass().getName()); MessageConsumer consumer = s.createConsumer(queue); assertNotNull(consumer); - Topic advisoryTopic = AdvisorySupport - .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); + Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); // start throwing messages at the consumer @@ -95,6 +104,7 @@ public class AdvisoryTests extends TestCase { assertNotNull(msg); } + @Test(timeout = 60000) public void testMessageDeliveryAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = s.createQueue(getClass().getName()); @@ -103,7 +113,7 @@ public class AdvisoryTests extends TestCase { Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); - //start throwing messages at the consumer + // start throwing messages at the consumer MessageProducer producer = s.createProducer(queue); BytesMessage m = s.createBytesMessage(); @@ -114,6 +124,7 @@ public class AdvisoryTests extends TestCase { assertNotNull(msg); } + @Test(timeout = 60000) public void testMessageConsumedAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = s.createQueue(getClass().getName()); @@ -121,7 +132,7 @@ public class AdvisoryTests extends TestCase { Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); - //start throwing messages at the consumer + // start throwing messages at the consumer MessageProducer producer = s.createProducer(queue); BytesMessage m = s.createBytesMessage(); @@ -140,6 +151,7 @@ public class AdvisoryTests extends TestCase { assertEquals(originalId, id); } + @Test(timeout = 60000) public void testMessageExpiredAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = s.createQueue(getClass().getName()); @@ -148,7 +160,7 @@ public class AdvisoryTests extends TestCase { Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); - //start throwing messages at the consumer + // start throwing messages at the consumer MessageProducer producer = s.createProducer(queue); producer.setTimeToLive(1); for (int i = 0; i < MESSAGE_COUNT; i++) { @@ -157,33 +169,36 @@ public class AdvisoryTests extends TestCase { producer.send(m); } - Message msg = advisoryConsumer.receive(2000); + Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD); assertNotNull(msg); } + @Test(timeout = 60000) public void testMessageDLQd() throws Exception { ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); policy.setTopicPrefetch(2); - ((ActiveMQConnection)connection).setPrefetchPolicy(policy); + ((ActiveMQConnection) connection).setPrefetchPolicy(policy); Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Topic topic = s.createTopic(getClass().getName()); Topic advisoryTopic = s.createTopic(">"); for (int i = 0; i < 100; i++) { - MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + s.createConsumer(advisoryTopic); } - MessageProducer producer = s.createProducer(topic); int count = 10; for (int i = 0; i < count; i++) { BytesMessage m = s.createBytesMessage(); producer.send(m); } + // we should get here without StackOverflow } - public void xtestMessageDiscardedAdvisory() throws Exception { + @Ignore + @Test(timeout = 60000) + public void testMessageDiscardedAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = s.createTopic(getClass().getName()); MessageConsumer consumer = s.createConsumer(topic); @@ -191,7 +206,7 @@ public class AdvisoryTests extends TestCase { Topic advisoryTopic = AdvisorySupport.getMessageDiscardedAdvisoryTopic((ActiveMQDestination) topic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); - //start throwing messages at the consumer + // start throwing messages at the consumer MessageProducer producer = s.createProducer(topic); int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2); for (int i = 0; i < count; i++) { @@ -203,30 +218,26 @@ public class AdvisoryTests extends TestCase { assertNotNull(msg); } - @Override - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { if (broker == null) { broker = createBroker(); } ConnectionFactory factory = createConnectionFactory(); connection = factory.createConnection(); connection.start(); - super.setUp(); } - @Override + @After protected void tearDown() throws Exception { - super.tearDown(); connection.close(); if (broker != null) { broker.stop(); } } - protected ActiveMQConnectionFactory createConnectionFactory() - throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( - ActiveMQConnection.DEFAULT_BROKER_URL); + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); return cf; } @@ -240,6 +251,7 @@ public class AdvisoryTests extends TestCase { protected void configureBroker(BrokerService answer) throws Exception { answer.setPersistent(false); PolicyEntry policy = new PolicyEntry(); + policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD); policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); @@ -247,7 +259,7 @@ public class AdvisoryTests extends TestCase { policy.setAdvisoryForSlowConsumers(true); policy.setAdvisoryWhenFull(true); policy.setProducerFlowControl(false); - ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); + ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); strategy.setLimit(10); policy.setPendingMessageLimitStrategy(strategy); PolicyMap pMap = new PolicyMap();