fix test cases after changes in https://issues.apache.org/jira/browse/AMQ-4237 broker the tests queue MBean lookup

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1427878 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-02 17:45:19 +00:00
parent 64f3c55d80
commit 19bf943dd7
3 changed files with 71 additions and 71 deletions

View File

@ -55,102 +55,102 @@ import org.slf4j.LoggerFactory;
/** /**
* Modified CursorSupport Unit test to reproduce the negative queue issue. * Modified CursorSupport Unit test to reproduce the negative queue issue.
* *
* Keys to reproducing: * Keys to reproducing:
* 1) Consecutive queues with listener on first sending to second queue * 1) Consecutive queues with listener on first sending to second queue
* 2) Push each queue to the memory limit * 2) Push each queue to the memory limit
* This seems to help reproduce the issue more consistently, but * This seems to help reproduce the issue more consistently, but
* we have seen times in our production environment where the * we have seen times in our production environment where the
* negative queue can occur without. Our memory limits are * negative queue can occur without. Our memory limits are
* very high in production and it still happens in varying * very high in production and it still happens in varying
* frequency. * frequency.
* 3) Prefetch * 3) Prefetch
* Lowering the prefetch down to 10 and below seems to help * Lowering the prefetch down to 10 and below seems to help
* reduce occurrences. * reduce occurrences.
* 4) # of consumers per queue * 4) # of consumers per queue
* The issue occurs less with fewer consumers * The issue occurs less with fewer consumers
* *
* Things that do not affect reproduction: * Things that do not affect reproduction:
* 1) Spring - we use spring in our production applications, but this test case works * 1) Spring - we use spring in our production applications, but this test case works
* with or without it. * with or without it.
* 2) transacted * 2) transacted
* *
*/ */
public class NegativeQueueTest extends AutoFailTestSupport { public class NegativeQueueTest extends AutoFailTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class); private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class);
public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS"); public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
private static final String QUEUE_1_NAME = "conn.test.queue.1"; private static final String QUEUE_1_NAME = "conn.test.queue.1";
private static final String QUEUE_2_NAME = "conn.test.queue.2"; private static final String QUEUE_2_NAME = "conn.test.queue.2";
private static final long QUEUE_MEMORY_LIMIT = 2097152; private static final long QUEUE_MEMORY_LIMIT = 2097152;
private static final long MEMORY_USAGE = 400000000; private static final long MEMORY_USAGE = 400000000;
private static final long TEMP_USAGE = 200000000; private static final long TEMP_USAGE = 200000000;
private static final long STORE_USAGE = 1000000000; private static final long STORE_USAGE = 1000000000;
private static final int MESSAGE_COUNT = 1100; private static final int MESSAGE_COUNT = 1100;
protected static final boolean TRANSACTED = true; protected static final boolean TRANSACTED = true;
protected static final boolean DEBUG = true; protected static final boolean DEBUG = true;
protected static int NUM_CONSUMERS = 20; protected static int NUM_CONSUMERS = 20;
protected static int PREFETCH_SIZE = 1000; protected static int PREFETCH_SIZE = 1000;
protected BrokerService broker; protected BrokerService broker;
protected String bindAddress = "tcp://localhost:0"; protected String bindAddress = "tcp://localhost:0";
public void testWithDefaultPrefetch() throws Exception{ public void testWithDefaultPrefetch() throws Exception{
PREFETCH_SIZE = 1000; PREFETCH_SIZE = 1000;
NUM_CONSUMERS = 20; NUM_CONSUMERS = 20;
blastAndConsume(); blastAndConsume();
} }
public void x_testWithDefaultPrefetchFiveConsumers() throws Exception{ public void x_testWithDefaultPrefetchFiveConsumers() throws Exception{
PREFETCH_SIZE = 1000; PREFETCH_SIZE = 1000;
NUM_CONSUMERS = 5; NUM_CONSUMERS = 5;
blastAndConsume(); blastAndConsume();
} }
public void x_testWithDefaultPrefetchTwoConsumers() throws Exception{ public void x_testWithDefaultPrefetchTwoConsumers() throws Exception{
PREFETCH_SIZE = 1000; PREFETCH_SIZE = 1000;
NUM_CONSUMERS = 2; NUM_CONSUMERS = 2;
blastAndConsume(); blastAndConsume();
} }
public void testWithDefaultPrefetchOneConsumer() throws Exception{ public void testWithDefaultPrefetchOneConsumer() throws Exception{
PREFETCH_SIZE = 1000; PREFETCH_SIZE = 1000;
NUM_CONSUMERS = 1; NUM_CONSUMERS = 1;
blastAndConsume(); blastAndConsume();
} }
public void testWithMediumPrefetch() throws Exception{ public void testWithMediumPrefetch() throws Exception{
PREFETCH_SIZE = 50; PREFETCH_SIZE = 50;
NUM_CONSUMERS = 20; NUM_CONSUMERS = 20;
blastAndConsume(); blastAndConsume();
} }
public void x_testWithSmallPrefetch() throws Exception{ public void x_testWithSmallPrefetch() throws Exception{
PREFETCH_SIZE = 10; PREFETCH_SIZE = 10;
NUM_CONSUMERS = 20; NUM_CONSUMERS = 20;
blastAndConsume(); blastAndConsume();
} }
public void testWithNoPrefetch() throws Exception{ public void testWithNoPrefetch() throws Exception{
PREFETCH_SIZE = 1; PREFETCH_SIZE = 1;
NUM_CONSUMERS = 20; NUM_CONSUMERS = 20;
blastAndConsume(); blastAndConsume();
} }
public void blastAndConsume() throws Exception { public void blastAndConsume() throws Exception {
LOG.info(getName()); LOG.info(getName());
ConnectionFactory factory = createConnectionFactory(); ConnectionFactory factory = createConnectionFactory();
//get proxy queues for statistics lookups //get proxy queues for statistics lookups
Connection proxyConnection = factory.createConnection(); Connection proxyConnection = factory.createConnection();
proxyConnection.start(); proxyConnection.start();
Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_1_NAME)); final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME));
final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean((Queue)proxySession.createQueue(QUEUE_2_NAME)); final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME));
// LOAD THE QUEUE // LOAD THE QUEUE
Connection producerConnection = factory.createConnection(); Connection producerConnection = factory.createConnection();
producerConnection.start(); producerConnection.start();
@ -168,7 +168,7 @@ public class NegativeQueueTest extends AutoFailTestSupport {
System.out.print(index-((index/10)*10)); System.out.print(index-((index/10)*10));
} }
} }
//get access to the Queue info //get access to the Queue info
if(DEBUG){ if(DEBUG){
System.out.println(""); System.out.println("");
@ -176,16 +176,16 @@ public class NegativeQueueTest extends AutoFailTestSupport {
System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage()); System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage());
System.out.println("Queue1 Memory Available = "+proxyQueue1.getMemoryLimit()); System.out.println("Queue1 Memory Available = "+proxyQueue1.getMemoryLimit());
} }
// FLUSH THE QUEUE // FLUSH THE QUEUE
final CountDownLatch latch1 = new CountDownLatch(1); final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1); final CountDownLatch latch2 = new CountDownLatch(1);
Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS]; Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS];
List<Message> consumerList1 = new ArrayList<Message>(); List<Message> consumerList1 = new ArrayList<Message>();
Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS]; Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS];
Connection[] producerConnections2 = new Connection[NUM_CONSUMERS]; Connection[] producerConnections2 = new Connection[NUM_CONSUMERS];
List<Message> consumerList2 = new ArrayList<Message>(); List<Message> consumerList2 = new ArrayList<Message>();
for(int ix=0; ix<NUM_CONSUMERS; ix++){ for(int ix=0; ix<NUM_CONSUMERS; ix++){
producerConnections2[ix] = factory.createConnection(); producerConnections2[ix] = factory.createConnection();
producerConnections2[ix].start(); producerConnections2[ix].start();
@ -194,7 +194,7 @@ public class NegativeQueueTest extends AutoFailTestSupport {
MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME)); MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME));
consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1)); consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
} }
latch1.await(200000, TimeUnit.MILLISECONDS); latch1.await(200000, TimeUnit.MILLISECONDS);
if(DEBUG){ if(DEBUG){
System.out.println(""); System.out.println("");
@ -202,15 +202,16 @@ public class NegativeQueueTest extends AutoFailTestSupport {
System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage()); System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage());
System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit()); System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit());
} }
for(int ix=0; ix<NUM_CONSUMERS; ix++){ for(int ix=0; ix<NUM_CONSUMERS; ix++){
consumerConnections2[ix] = getConsumerConnection(factory); consumerConnections2[ix] = getConsumerConnection(factory);
Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME)); MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME));
consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2)); consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2));
} }
boolean success = Wait.waitFor(new Wait.Condition() { boolean success = Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
boolean done = latch2.await(10, TimeUnit.SECONDS); boolean done = latch2.await(10, TimeUnit.SECONDS);
if(DEBUG){ if(DEBUG){
@ -235,10 +236,10 @@ public class NegativeQueueTest extends AutoFailTestSupport {
consumerConnections2[ix].close(); consumerConnections2[ix].close();
producerConnections2[ix].close(); producerConnections2[ix].close();
} }
//let the consumer statistics on queue2 have time to update //let the consumer statistics on queue2 have time to update
Thread.sleep(500); Thread.sleep(500);
if(DEBUG){ if(DEBUG){
System.out.println(""); System.out.println("");
System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize()); System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize());
@ -248,37 +249,39 @@ public class NegativeQueueTest extends AutoFailTestSupport {
} }
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 0 == proxyQueue1.getQueueSize(); return 0 == proxyQueue1.getQueueSize();
}}); }});
assertEquals("Queue1 has gone negative,",0, proxyQueue1.getQueueSize()); assertEquals("Queue1 has gone negative,",0, proxyQueue1.getQueueSize());
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 0 == proxyQueue2.getQueueSize(); return 0 == proxyQueue2.getQueueSize();
}}); }});
assertEquals("Queue2 has gone negative,",0, proxyQueue2.getQueueSize()); assertEquals("Queue2 has gone negative,",0, proxyQueue2.getQueueSize());
proxyConnection.close(); proxyConnection.close();
} }
private QueueViewMBean getProxyToQueueViewMBean(Queue queue) private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException {
throws MalformedObjectNameException, JMSException { final String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" + ObjectName queueViewMBeanName = new ObjectName(prefix + queue.getQueueName());
queue.getQueueName() + ",BrokerName=localhost"); QueueViewMBean proxy = (QueueViewMBean)
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean.class, true);
return proxy; return proxy;
} }
protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException { protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
Connection connection = fac.createConnection(); Connection connection = fac.createConnection();
connection.start(); connection.start();
return connection; return connection;
} }
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
if (broker == null) { if (broker == null) {
broker = createBroker(); broker = createBroker();
@ -286,6 +289,7 @@ public class NegativeQueueTest extends AutoFailTestSupport {
super.setUp(); super.setUp();
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
super.tearDown(); super.tearDown();
if (broker != null) { if (broker != null) {
@ -312,18 +316,18 @@ public class NegativeQueueTest extends AutoFailTestSupport {
bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString(); bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString();
return answer; return answer;
} }
protected void configureBroker(BrokerService answer) throws Exception { protected void configureBroker(BrokerService answer) throws Exception {
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setMemoryLimit(QUEUE_MEMORY_LIMIT); policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy()); policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
// disable the cache to be sure setBatch is the problem // disable the cache to be sure setBatch is the problem
// will get lots of duplicates // will get lots of duplicates
// real problem is sync between cursor and store add - leads to out or order messages // real problem is sync between cursor and store add - leads to out or order messages
// in the cursor so setBatch can break. // in the cursor so setBatch can break.
// policy.setUseCache(false); // policy.setUseCache(false);
PolicyMap pMap = new PolicyMap(); PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy); pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap); answer.setDestinationPolicy(pMap);
@ -331,14 +335,14 @@ public class NegativeQueueTest extends AutoFailTestSupport {
answer.addConnector("tcp://localhost:0"); answer.addConnector("tcp://localhost:0");
MemoryUsage memoryUsage = new MemoryUsage(); MemoryUsage memoryUsage = new MemoryUsage();
memoryUsage.setLimit(MEMORY_USAGE); memoryUsage.setLimit(MEMORY_USAGE);
memoryUsage.setPercentUsageMinDelta(20); memoryUsage.setPercentUsageMinDelta(20);
TempUsage tempUsage = new TempUsage(); TempUsage tempUsage = new TempUsage();
tempUsage.setLimit(TEMP_USAGE); tempUsage.setLimit(TEMP_USAGE);
StoreUsage storeUsage = new StoreUsage(); StoreUsage storeUsage = new StoreUsage();
storeUsage.setLimit(STORE_USAGE); storeUsage.setLimit(STORE_USAGE);
SystemUsage systemUsage = new SystemUsage(); SystemUsage systemUsage = new SystemUsage();
systemUsage.setMemoryUsage(memoryUsage); systemUsage.setMemoryUsage(memoryUsage);
@ -346,27 +350,27 @@ public class NegativeQueueTest extends AutoFailTestSupport {
systemUsage.setStoreUsage(storeUsage); systemUsage.setStoreUsage(storeUsage);
answer.setSystemUsage(systemUsage); answer.setSystemUsage(systemUsage);
} }
/** /**
* Message listener that is given the Session for transacted consumers * Message listener that is given the Session for transacted consumers
*/ */
class SessionAwareMessageListener implements MessageListener{ class SessionAwareMessageListener implements MessageListener{
private List<Message> consumerList; private final List<Message> consumerList;
private CountDownLatch latch; private final CountDownLatch latch;
private Session consumerSession; private final Session consumerSession;
private Session producerSession; private Session producerSession;
private MessageProducer producer; private MessageProducer producer;
public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList){ public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList){
this(null, consumerSession, null, latch, consumerList); this(null, consumerSession, null, latch, consumerList);
} }
public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName, public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName,
CountDownLatch latch, List<Message> consumerList){ CountDownLatch latch, List<Message> consumerList){
this.consumerList = consumerList; this.consumerList = consumerList;
this.latch = latch; this.latch = latch;
this.consumerSession = consumerSession; this.consumerSession = consumerSession;
if(producerConnection != null){ if(producerConnection != null){
try { try {
producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
@ -377,7 +381,8 @@ public class NegativeQueueTest extends AutoFailTestSupport {
} }
} }
} }
@Override
public void onMessage(Message msg) { public void onMessage(Message msg) {
try { try {
if(producer == null){ if(producer == null){
@ -392,7 +397,7 @@ public class NegativeQueueTest extends AutoFailTestSupport {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
synchronized(consumerList){ synchronized(consumerList){
consumerList.add(msg); consumerList.add(msg);
if(DEBUG && consumerList.size()%100 == 0) { if(DEBUG && consumerList.size()%100 == 0) {
@ -411,5 +416,5 @@ public class NegativeQueueTest extends AutoFailTestSupport {
} }
} }
} }
} }
} }

View File

@ -98,7 +98,8 @@ public class AMQ2513Test extends TestCase {
DestinationViewMBean createView() throws Exception { DestinationViewMBean createView() throws Exception {
String domain = "org.apache.activemq"; String domain = "org.apache.activemq";
ObjectName name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test"); ObjectName name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
"destinationType=Queue,destinationName=test");
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
true); true);
} }

View File

@ -163,10 +163,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
for (ObjectName name : broker.getAdminView().getQueues()) {
LOG.info("Broker Queue: {}", name);
}
final DestinationViewMBean view = createView(destination); final DestinationViewMBean view = createView(destination);
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override @Override
@ -581,8 +577,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test"); name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test");
} }
LOG.info("Attempting to find Queue named: {}", name);
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
} }