From c2922ad1fcf85a72e69b294efeea3ed7720e2adb Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 26 May 2009 08:49:11 +0000 Subject: [PATCH] Added test case from Richard Yager for https://issues.apache.org/activemq/browse/AMQ-1918 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@778622 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/cursors/CursorSupport.java | 16 +- .../region/cursors/NegativeQueueTest.java | 375 ++++++++++++++++++ 2 files changed, 380 insertions(+), 11 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java index a4be3bdd03..d1b4a8147f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java @@ -16,6 +16,11 @@ */ package org.apache.activemq.broker.region.cursors; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -23,7 +28,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -33,16 +37,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import junit.framework.Test; -import junit.framework.TestCase; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerTest; -import org.apache.activemq.broker.region.Queue; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * @version $Revision: 1.3 $ */ diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java new file mode 100644 index 0000000000..18424bb672 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.StoreUsage; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.TempUsage; + +/** + * Modified CursorSupport Unit test to reproduce the negative queue issue. + * + * Keys to reproducing: + * 1) Consecutive queues with listener on first sending to second queue + * 2) Push each queue to the memory limit + * This seems to help reproduce the issue more consistently, but + * we have seen times in our production environment where the + * negative queue can occur without. Our memory limits are + * very high in production and it still happens in varying + * frequency. + * 3) Prefetch + * Lowering the prefetch down to 10 and below seems to help + * reduce occurrences. + * 4) # of consumers per queue + * The issue occurs less with fewer consumers + * + * Things that do not affect reproduction: + * 1) Spring - we use spring in our production applications, but this test case works + * with or without it. + * 2) transacted + * + */ +public class NegativeQueueTest extends TestCase { + + public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS"); + + private static final String QUEUE_1_NAME = "conn.test.queue.1"; + private static final String QUEUE_2_NAME = "conn.test.queue.2"; + + private static final long QUEUE_MEMORY_LIMIT = 2097152; + private static final long MEMORY_USAGE = 400000000; + private static final long TEMP_USAGE = 200000000; + private static final long STORE_USAGE = 1000000000; + private static final int MESSAGE_COUNT = 2000; + + protected static final boolean TRANSACTED = true; + protected static final boolean DEBUG = false; + protected static int NUM_CONSUMERS = 20; + protected static int PREFETCH_SIZE = 1000; + + protected BrokerService broker; + protected String bindAddress = "tcp://localhost:60706"; + + public void testWithDefaultPrefetch() throws Exception{ + PREFETCH_SIZE = 1000; + NUM_CONSUMERS = 20; + blastAndConsume(); + } + + public void testWithDefaultPrefetchFiveConsumers() throws Exception{ + PREFETCH_SIZE = 1000; + NUM_CONSUMERS = 5; + blastAndConsume(); + } + + public void testWithDefaultPrefetchTwoConsumers() throws Exception{ + PREFETCH_SIZE = 1000; + NUM_CONSUMERS = 2; + blastAndConsume(); + } + + public void testWithDefaultPrefetchOneConsumer() throws Exception{ + PREFETCH_SIZE = 1000; + NUM_CONSUMERS = 1; + blastAndConsume(); + } + + public void testWithMediumPrefetch() throws Exception{ + PREFETCH_SIZE = 50; + NUM_CONSUMERS = 20; + blastAndConsume(); + } + + public void testWithSmallPrefetch() throws Exception{ + PREFETCH_SIZE = 10; + NUM_CONSUMERS = 20; + blastAndConsume(); + } + + public void testWithNoPrefetch() throws Exception{ + PREFETCH_SIZE = 1; + blastAndConsume(); + } + + public void blastAndConsume() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + + //get proxy queues for statistics lookups + Connection proxyConnection = factory.createConnection(); + proxyConnection.start(); + Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_1_NAME)); + final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_2_NAME)); + + // LOAD THE QUEUE + Connection producerConnection = factory.createConnection(); + producerConnection.start(); + Session session = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createQueue(QUEUE_1_NAME); + MessageProducer producer = session.createProducer(queue); + List senderList = new ArrayList(); + for (int i = 0; i < MESSAGE_COUNT; i++) { + TextMessage msg = session.createTextMessage(formatter.format(new Date())); + senderList.add(msg); + producer.send(msg); + if(TRANSACTED) session.commit(); + if(DEBUG && i%100 == 0){ + int index = (i/100)+1; + System.out.print(index-((index/10)*10)); + } + } + + //get access to the Queue info + if(DEBUG){ + System.out.println(""); + System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize()); + System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage()); + System.out.println("Queue1 Memory Available = "+proxyQueue1.getMemoryLimit()); + } + + // FLUSH THE QUEUE + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS]; + List consumerList1 = new ArrayList(); + Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS]; + Connection[] producerConnections2 = new Connection[NUM_CONSUMERS]; + List consumerList2 = new ArrayList(); + + for(int ix=0; ix consumerList; + private CountDownLatch latch; + private Session consumerSession; + private Session producerSession; + private MessageProducer producer; + + public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List consumerList){ + this(null, consumerSession, null, latch, consumerList); + } + + public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName, + CountDownLatch latch, List consumerList){ + this.consumerList = consumerList; + this.latch = latch; + this.consumerSession = consumerSession; + + if(producerConnection != null){ + try { + producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); + Destination queue = producerSession.createQueue(outQueueName); + producer = producerSession.createProducer(queue); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + + public void onMessage(Message msg) { + try { + if(producer == null){ + // sleep to act as a slow consumer + // which will force a mix of direct and polled dispatching + // using the cursor on the broker + Thread.sleep(50); + }else{ + producer.send(msg); + if(TRANSACTED) producerSession.commit(); + } + } catch (Exception e) { + e.printStackTrace(); + } + + synchronized(consumerList){ + consumerList.add(msg); + if(DEBUG && consumerList.size()%100 == 0) { + int index = consumerList.size()/100; + System.out.print(index-((index/10)*10)); + } + if (consumerList.size() == MESSAGE_COUNT) { + latch.countDown(); + } + } + if(TRANSACTED){ + try { + consumerSession.commit(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + } +} \ No newline at end of file