diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java index 23b9e80356..ccb1bf39c1 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java @@ -55,102 +55,102 @@ import org.slf4j.LoggerFactory; /** * Modified CursorSupport Unit test to reproduce the negative queue issue. - * + * * Keys to reproducing: * 1) Consecutive queues with listener on first sending to second queue * 2) Push each queue to the memory limit * This seems to help reproduce the issue more consistently, but * we have seen times in our production environment where the * negative queue can occur without. Our memory limits are - * very high in production and it still happens in varying + * very high in production and it still happens in varying * frequency. * 3) Prefetch - * Lowering the prefetch down to 10 and below seems to help - * reduce occurrences. + * Lowering the prefetch down to 10 and below seems to help + * reduce occurrences. * 4) # of consumers per queue * The issue occurs less with fewer consumers - * + * * Things that do not affect reproduction: * 1) Spring - we use spring in our production applications, but this test case works * with or without it. * 2) transacted - * + * */ public class NegativeQueueTest extends AutoFailTestSupport { private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class); - + public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS"); - + private static final String QUEUE_1_NAME = "conn.test.queue.1"; private static final String QUEUE_2_NAME = "conn.test.queue.2"; - + private static final long QUEUE_MEMORY_LIMIT = 2097152; private static final long MEMORY_USAGE = 400000000; private static final long TEMP_USAGE = 200000000; private static final long STORE_USAGE = 1000000000; private static final int MESSAGE_COUNT = 1100; - + protected static final boolean TRANSACTED = true; protected static final boolean DEBUG = true; - protected static int NUM_CONSUMERS = 20; - protected static int PREFETCH_SIZE = 1000; - + protected static int NUM_CONSUMERS = 20; + protected static int PREFETCH_SIZE = 1000; + protected BrokerService broker; protected String bindAddress = "tcp://localhost:0"; - + public void testWithDefaultPrefetch() throws Exception{ PREFETCH_SIZE = 1000; NUM_CONSUMERS = 20; blastAndConsume(); } - + public void x_testWithDefaultPrefetchFiveConsumers() throws Exception{ PREFETCH_SIZE = 1000; NUM_CONSUMERS = 5; blastAndConsume(); } - + public void x_testWithDefaultPrefetchTwoConsumers() throws Exception{ PREFETCH_SIZE = 1000; NUM_CONSUMERS = 2; blastAndConsume(); } - + public void testWithDefaultPrefetchOneConsumer() throws Exception{ PREFETCH_SIZE = 1000; NUM_CONSUMERS = 1; blastAndConsume(); } - + public void testWithMediumPrefetch() throws Exception{ PREFETCH_SIZE = 50; NUM_CONSUMERS = 20; blastAndConsume(); - } - + } + public void x_testWithSmallPrefetch() throws Exception{ PREFETCH_SIZE = 10; NUM_CONSUMERS = 20; blastAndConsume(); } - + public void testWithNoPrefetch() throws Exception{ PREFETCH_SIZE = 1; NUM_CONSUMERS = 20; blastAndConsume(); } - + public void blastAndConsume() throws Exception { LOG.info(getName()); ConnectionFactory factory = createConnectionFactory(); - + //get proxy queues for statistics lookups Connection proxyConnection = factory.createConnection(); proxyConnection.start(); Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_1_NAME)); - final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_2_NAME)); - + final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME)); + final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME)); + // LOAD THE QUEUE Connection producerConnection = factory.createConnection(); producerConnection.start(); @@ -168,7 +168,7 @@ public class NegativeQueueTest extends AutoFailTestSupport { System.out.print(index-((index/10)*10)); } } - + //get access to the Queue info if(DEBUG){ System.out.println(""); @@ -176,16 +176,16 @@ public class NegativeQueueTest extends AutoFailTestSupport { System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage()); System.out.println("Queue1 Memory Available = "+proxyQueue1.getMemoryLimit()); } - + // FLUSH THE QUEUE final CountDownLatch latch1 = new CountDownLatch(1); - final CountDownLatch latch2 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS]; List consumerList1 = new ArrayList(); Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS]; Connection[] producerConnections2 = new Connection[NUM_CONSUMERS]; List consumerList2 = new ArrayList(); - + for(int ix=0; ix consumerList; - private CountDownLatch latch; - private Session consumerSession; + private final List consumerList; + private final CountDownLatch latch; + private final Session consumerSession; private Session producerSession; private MessageProducer producer; - + public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List consumerList){ this(null, consumerSession, null, latch, consumerList); } - - public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName, + + public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName, CountDownLatch latch, List consumerList){ this.consumerList = consumerList; this.latch = latch; this.consumerSession = consumerSession; - + if(producerConnection != null){ try { producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); @@ -377,7 +381,8 @@ public class NegativeQueueTest extends AutoFailTestSupport { } } } - + + @Override public void onMessage(Message msg) { try { if(producer == null){ @@ -392,7 +397,7 @@ public class NegativeQueueTest extends AutoFailTestSupport { } catch (Exception e) { e.printStackTrace(); } - + synchronized(consumerList){ consumerList.add(msg); if(DEBUG && consumerList.size()%100 == 0) { @@ -411,5 +416,5 @@ public class NegativeQueueTest extends AutoFailTestSupport { } } } - } + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java index 39de02d6ed..b9cfbd9c3c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java @@ -98,7 +98,8 @@ public class AMQ2513Test extends TestCase { DestinationViewMBean createView() throws Exception { String domain = "org.apache.activemq"; - ObjectName name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test"); + ObjectName name = new ObjectName(domain + ":type=Broker,brokerName=localhost," + + "destinationType=Queue,destinationName=test"); return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index 618dcd785f..f57a05b69d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -163,10 +163,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { TimeUnit.SECONDS.sleep(5); - for (ObjectName name : broker.getAdminView().getQueues()) { - LOG.info("Broker Queue: {}", name); - } - final DestinationViewMBean view = createView(destination); Wait.waitFor(new Wait.Condition() { @Override @@ -581,8 +577,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test"); } - LOG.info("Attempting to find Queue named: {}", name); - return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); }