From 82330793e939b521e39a380fea963fc6d28fed9a Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 13 Apr 2016 15:17:14 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5621 Clean up the test, use timeouts and add a few more iterations to try and reproduce recent CI failures from this test. --- ...eZeroPrefetchLazyDispatchPriorityTest.java | 138 +++++++----------- 1 file changed, 52 insertions(+), 86 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java index cff3beeee5..611fb99b2d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,9 +16,14 @@ */ package org.apache.activemq.usecases; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + import java.util.ArrayList; import java.util.Enumeration; import java.util.List; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -27,62 +32,60 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.QueueBrowser; import javax.jms.Session; -import junit.framework.TestCase; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +public class QueueZeroPrefetchLazyDispatchPriorityTest { -public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(QueueZeroPrefetchLazyDispatchPriorityTest.class); + + private final byte[] PAYLOAD = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + private final int ITERATIONS = 10; - static final Logger LOG = LoggerFactory.getLogger(QueueZeroPrefetchLazyDispatchPriorityTest.class); private BrokerService broker; - public static final byte[] PAYLOAD = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { broker = createBroker(); broker.start(); broker.waitUntilStarted(); } - protected void tearDown() throws Exception { - + @After + public void tearDown() throws Exception { if (broker != null) { broker.stop(); } } - + @Test(timeout=90000) public void testPriorityMessages() throws Exception { + for (int i = 0; i < ITERATIONS; i++) { - for (int i = 0; i < 5; i++) { - - - //send 4 message priority MEDIUM + // send 4 message priority MEDIUM produceMessages(4, 4, "TestQ"); - - //send 1 message priority HIGH + // send 1 message priority HIGH produceMessages(1, 5, "TestQ"); - - LOG.info("On iteration " + i); - + LOG.info("On iteration {}", i); Thread.sleep(500); - // consume messages ArrayList consumeList = consumeMessages("TestQ"); LOG.info("Consumed list " + consumeList.size()); - // compare lists assertEquals("message 1 should be priority high", 5, consumeList.get(0).getJMSPriority()); assertEquals("message 2 should be priority medium", 4, consumeList.get(1).getJMSPriority()); @@ -90,99 +93,91 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { assertEquals("message 4 should be priority medium", 4, consumeList.get(3).getJMSPriority()); assertEquals("message 5 should be priority medium", 4, consumeList.get(4).getJMSPriority()); } - } - + @Test(timeout=120000) public void testPriorityMessagesMoreThanPageSize() throws Exception { - final int numToSend = 450; - for (int i = 0; i < 5; i++) { - + for (int i = 0; i < ITERATIONS; i++) { produceMessages(numToSend - 1, 4, "TestQ"); // ensure we get expiry processing Thread.sleep(700); - - //send 1 message priority HIGH + // send 1 message priority HIGH produceMessages(1, 5, "TestQ"); Thread.sleep(500); - LOG.info("On iteration " + i); + LOG.info("On iteration {}", i); // consume messages ArrayList consumeList = consumeMessages("TestQ"); - LOG.info("Consumed list " + consumeList.size()); - + LOG.info("Consumed list {}", consumeList.size()); // compare lists + assertFalse("Consumed list should not be empty", consumeList.isEmpty()); assertEquals("message 1 should be priority high", 5, consumeList.get(0).getJMSPriority()); for (int j = 1; j < (numToSend - 1); j++) { assertEquals("message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority()); } } - } - + @Test(timeout=90000) public void testLongLivedPriorityConsumer() throws Exception { final int numToSend = 150; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); Connection connection = connectionFactory.createConnection(); + try { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(new ActiveMQQueue("TestQ")); connection.start(); - for (int i = 0; i < 5; i++) { - + for (int i = 0; i < ITERATIONS; i++) { produceMessages(numToSend - 1, 4, "TestQ"); - //send 1 message priority HIGH + // send 1 message priority HIGH produceMessages(1, 5, "TestQ"); Message message = consumer.receive(4000); assertEquals("message should be priority high", 5, message.getJMSPriority()); - } } finally { connection.close(); } ArrayList consumeList = consumeMessages("TestQ"); - LOG.info("Consumed list " + consumeList.size()); + LOG.info("Consumed list {}", consumeList.size()); for (Message message : consumeList) { assertEquals("should be priority medium", 4, message.getJMSPriority()); } - } - + @Test(timeout=90000) public void testPriorityMessagesWithJmsBrowser() throws Exception { - final int numToSend = 250; - for (int i = 0; i < 5; i++) { + for (int i = 0; i < ITERATIONS; i++) { produceMessages(numToSend - 1, 4, "TestQ"); ArrayList browsed = browseMessages("TestQ"); - LOG.info("Browsed: " + browsed.size()); + LOG.info("Browsed: {}", browsed.size()); - //send 1 message priority HIGH + // send 1 message priority HIGH produceMessages(1, 5, "TestQ"); Thread.sleep(500); - LOG.info("On iteration " + i); + LOG.info("On iteration {}", i); Message message = consumeOneMessage("TestQ"); assertNotNull(message); @@ -190,29 +185,28 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { // consume messages ArrayList consumeList = consumeMessages("TestQ"); - LOG.info("Consumed list " + consumeList.size()); - + LOG.info("Consumed list {}", consumeList.size()); // compare lists - //assertEquals("Iteration: " + i +", message 1 should be priority high", 5, consumeList.get(0).getJMSPriority()); + // assertEquals("Iteration: " + i + // +", message 1 should be priority high", 5, + // consumeList.get(0).getJMSPriority()); for (int j = 1; j < (numToSend - 1); j++) { assertEquals("Iteration: " + i + ", message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority()); } } - } + @Test(timeout=90000) public void testJmsBrowserGetsPagedIn() throws Exception { - - final int numToSend = 10; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < ITERATIONS; i++) { produceMessages(numToSend, 4, "TestQ"); ArrayList browsed = browseMessages("TestQ"); - LOG.info("Browsed: " + browsed.size()); + LOG.info("Browsed: {}", browsed.size()); assertEquals(0, browsed.size()); @@ -221,7 +215,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { browsed = browseMessages("TestQ"); - LOG.info("Browsed: " + browsed.size()); + LOG.info("Browsed: {}", browsed.size()); assertEquals("see only the paged in for pull", 1, browsed.size()); @@ -229,41 +223,33 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { ArrayList consumeList = consumeMessages("TestQ"); LOG.info("Consumed list " + consumeList.size()); assertEquals(numToSend, consumeList.size()); - } - } - private void produceMessages(int numberOfMessages, int priority, String queueName) throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); connectionFactory.setConnectionIDPrefix("pri-" + priority); Connection connection = connectionFactory.createConnection(); + try { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(new ActiveMQQueue(queueName)); connection.start(); - for (int i = 0; i < numberOfMessages; i++) { BytesMessage m = session.createBytesMessage(); m.writeBytes(PAYLOAD); m.setJMSPriority(priority); producer.send(m, Message.DEFAULT_DELIVERY_MODE, m.getJMSPriority(), Message.DEFAULT_TIME_TO_LIVE); } - } finally { - if (connection != null) { connection.close(); } } } - private ArrayList consumeMessages(String queueName) throws Exception { - ArrayList returnedMessages = new ArrayList(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); @@ -275,7 +261,6 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { boolean finished = false; while (!finished) { - Message message = consumer.receive(1000); if (message == null) { finished = true; @@ -284,20 +269,15 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { if (message != null) { returnedMessages.add(message); } - } consumer.close(); return returnedMessages; - } finally { - if (connection != null) { connection.close(); } } - - } private Message consumeOneMessage(String queueName) throws Exception { @@ -305,7 +285,6 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { } private Message consumeOneMessage(String queueName, int ackMode) throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); Connection connection = connectionFactory.createConnection(); try { @@ -313,19 +292,15 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(queueName)); connection.start(); - return consumer.receive(1000); - + return consumer.receive(2000); } finally { - if (connection != null) { connection.close(); } } - } private ArrayList browseMessages(String queueName) throws Exception { - ArrayList returnedMessages = new ArrayList(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); @@ -335,31 +310,25 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { QueueBrowser consumer = session.createBrowser(new ActiveMQQueue(queueName)); connection.start(); - Enumeration enumeration = consumer.getEnumeration(); + Enumeration enumeration = consumer.getEnumeration(); while (enumeration.hasMoreElements()) { - Message message = (Message) enumeration.nextElement(); returnedMessages.add(message); - } return returnedMessages; - } finally { - if (connection != null) { connection.close(); } } - - } private BrokerService createBroker() throws Exception { BrokerService broker = new BrokerService(); broker.setDeleteAllMessagesOnStartup(true); - //add the policy entries + // add the policy entries PolicyMap policyMap = new PolicyMap(); List entries = new ArrayList(); PolicyEntry pe = new PolicyEntry(); @@ -384,10 +353,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { policyMap.setPolicyEntries(entries); broker.setDestinationPolicy(policyMap); - broker.addConnector("tcp://0.0.0.0:0"); return broker; } - - } \ No newline at end of file