From b0a38ff6c106aa70d58ace22bebdde627a035d26 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 28 Mar 2011 20:10:41 +0000 Subject: [PATCH] Update the test case so that its not dependent on port 61616 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1086378 13f79535-47bb-0310-9956-ffa450edef68 --- .../usecases/ExpiredMessagesTest.java | 186 +++++++++--------- 1 file changed, 94 insertions(+), 92 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 250844393d..ac7b2a038d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -50,7 +50,7 @@ import static org.apache.activemq.TestSupport.getDestinationStatistics; public class ExpiredMessagesTest extends CombinationTestSupport { private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesTest.class); - + BrokerService broker; Connection connection; Session session; @@ -60,7 +60,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport { public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ"); public boolean useTextMessage = true; public boolean useVMCursor = true; - + protected String brokerUri; + public static Test suite() { return suite(ExpiredMessagesTest.class); } @@ -68,78 +69,79 @@ public class ExpiredMessagesTest extends CombinationTestSupport { public static void main(String[] args) { junit.textui.TestRunner.run(suite()); } - - protected void setUp() throws Exception { + + protected void setUp() throws Exception { final boolean deleteAllMessages = true; broker = createBroker(deleteAllMessages, 100); + brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); } - - public void testExpiredMessages() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - connection = factory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(destination); - producer.setTimeToLive(100); - consumer = session.createConsumer(destination); - connection.start(); - final AtomicLong received = new AtomicLong(); - - Thread consumerThread = new Thread("Consumer Thread") { - public void run() { - long start = System.currentTimeMillis(); - try { - long end = System.currentTimeMillis(); - while (end - start < 3000) { - if (consumer.receive(1000) != null) { - received.incrementAndGet(); - } - Thread.sleep(100); - end = System.currentTimeMillis(); - } - consumer.close(); - } catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - - consumerThread.start(); - - final int numMessagesToSend = 10000; - Thread producingThread = new Thread("Producing Thread") { + + public void testExpiredMessages() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + producer.setTimeToLive(100); + consumer = session.createConsumer(destination); + connection.start(); + final AtomicLong received = new AtomicLong(); + + Thread consumerThread = new Thread("Consumer Thread") { public void run() { + long start = System.currentTimeMillis(); try { - int i = 0; - while (i++ < numMessagesToSend) { - producer.send(session.createTextMessage("test")); - } - producer.close(); + long end = System.currentTimeMillis(); + while (end - start < 3000) { + if (consumer.receive(1000) != null) { + received.incrementAndGet(); + } + Thread.sleep(100); + end = System.currentTimeMillis(); + } + consumer.close(); } catch (Throwable ex) { ex.printStackTrace(); } } - }; - - producingThread.start(); - + }; + + consumerThread.start(); + + final int numMessagesToSend = 10000; + Thread producingThread = new Thread("Producing Thread") { + public void run() { + try { + int i = 0; + while (i++ < numMessagesToSend) { + producer.send(session.createTextMessage("test")); + } + producer.close(); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + + producingThread.start(); + consumerThread.join(); producingThread.join(); session.close(); - + final DestinationStatistics view = getDestinationStatistics(broker, destination); // wait for all to inflight to expire assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { return view.getInflight().getCount() == 0; - } + } })); assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount()); - + LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount()); - + // wait for all sent to get delivered and expire assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { @@ -148,15 +150,15 @@ public class ExpiredMessagesTest extends CombinationTestSupport { LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getDequeues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount()); return oldEnqueues == view.getEnqueues().getCount(); - } + } }, 60*1000)); - + LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount()); - + assertTrue("got at least what did not expire", received.get() >= view.getDequeues().getCount() - view.getExpired().getCount()); - + assertTrue("all messages expired - queue size gone to zero " + view.getMessages().getCount(), Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() @@ -164,61 +166,61 @@ public class ExpiredMessagesTest extends CombinationTestSupport { return view.getMessages().getCount() == 0; } })); - + final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount(); final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue; - + final DestinationStatistics dlqView = getDestinationStatistics(broker, dlqDestination); LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount() + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount()); - + Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { return totalExpiredCount == dlqView.getMessages().getCount(); } }); assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount()); - + // memory check assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage()); - assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage()); - + assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage()); + // verify DLQ MessageConsumer dlqConsumer = createDlqConsumer(connection); final DLQListener dlqListener = new DLQListener(); dlqConsumer.setMessageListener(dlqListener); - + Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { return totalExpiredCount == dlqListener.count; } }, 60 * 1000); - + assertEquals("dlq returned all expired", dlqListener.count, totalExpiredCount); - } + } class DLQListener implements MessageListener { - + int count = 0; - + public void onMessage(Message message) { count++; } - + }; - - private MessageConsumer createDlqConsumer(Connection connection) throws Exception { - return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination); + + private MessageConsumer createDlqConsumer(Connection connection) throws Exception { + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination); } public void initCombosForTestRecoverExpiredMessages() { - addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE}); - } - - public void testRecoverExpiredMessages() throws Exception { + addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE}); + } + + public void testRecoverExpiredMessages() throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( - "failover://tcp://localhost:61616"); + "failover://"+brokerUri); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -247,7 +249,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { producingThread.join(); DestinationStatistics view = getDestinationStatistics(broker, destination); - LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " @@ -263,7 +265,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { LOG.info("recovering broker"); final boolean deleteAllMessages = false; broker = createBroker(deleteAllMessages, 5000); - + Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { DestinationStatistics view = getDestinationStatistics(broker, destination); @@ -273,25 +275,25 @@ public class ExpiredMessagesTest extends CombinationTestSupport { + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount()); - + return view.getMessages().getCount() == 0; } }); - + view = getDestinationStatistics(broker, destination); assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount()); assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount()); } - private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception { - BrokerService broker = new BrokerService(); + private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception { + BrokerService broker = new BrokerService(); broker.setBrokerName("localhost"); broker.setDestinations(new ActiveMQDestination[]{destination}); AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter(); adaptor.setDirectory(new File("target/expiredtest-data/")); adaptor.setForceRecoverReferenceStore(true); broker.setPersistenceAdapter(adaptor); - + PolicyEntry defaultPolicy = new PolicyEntry(); if (useVMCursor) { defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); @@ -302,17 +304,17 @@ public class ExpiredMessagesTest extends CombinationTestSupport { policyMap.setDefaultEntry(defaultPolicy); broker.setDestinationPolicy(policyMap); broker.setDeleteAllMessagesOnStartup(deleteAllMessages); - broker.addConnector("tcp://localhost:61616"); + broker.addConnector("tcp://localhost:0"); broker.start(); broker.waitUntilStarted(); return broker; - } - - + } - protected void tearDown() throws Exception { - connection.stop(); - broker.stop(); - broker.waitUntilStopped(); - } + + + protected void tearDown() throws Exception { + connection.stop(); + broker.stop(); + broker.waitUntilStopped(); + } }