diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java new file mode 100644 index 0000000000..c62a2b67c4 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsConnectionStartStopTest; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; + +/** + * Test case intended to demonstrate delivery interruption to queue consumers when + * a JMS selector leaves some messages on the queue (due to use of a JMS Selector) + * + * testNonDiscriminatingConsumer() demonstrates proper functionality for consumers that don't use + * a selector to qualify their input. + * + * testDiscriminatingConsumer() demonstrates the failure condition in which delivery to the consumer + * eventually halts. + * + * The expected behavior is for the delivery to the client to be maintained regardless of the depth + * of the queue, particularly when the messages in the queue do not meet the selector criteria of the + * client. + * + * https://issues.apache.org/activemq/browse/AMQ-2217 + * + */ +public class DiscriminatingConsumerLoadTest extends TestSupport { + + private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory + .getLog(DiscriminatingConsumerLoadTest.class); + + private Connection producerConnection; + private Connection consumerConnection; + private int counterSent = 0; + private int counterReceived = 0; + + public static final String JMSTYPE_EATME = "DiscriminatingLoadClient.EatMe"; + public static final String JMSTYPE_IGNOREME = "DiscriminatingLoadClient.IgnoreMe"; + + private int testSize = 5000; // setting this to a small number will pass all tests + + BrokerService broker; + + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + + // workaround is to ensure sufficient dispatch buffer for the destination + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setMaxPageSize(testSize); + policyMap.setDefaultEntry(defaultPolicy); + broker.setDestinationPolicy(policyMap); + broker.start(); + + super.setUp(); + this.producerConnection = this.createConnection(); + this.consumerConnection = this.createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (producerConnection != null) { + producerConnection.close(); + producerConnection = null; + } + if (consumerConnection != null) { + consumerConnection.close(); + consumerConnection = null; + } + super.tearDown(); + broker.stop(); + } + + /** + * Test to check if a single consumer with no JMS selector will receive all intended messages + * + * @throws java.lang.Exception + */ + public void testNonDiscriminatingConsumer() throws Exception { + + consumerConnection = createConnection(); + consumerConnection.start(); + LOG.info("consumerConnection = " +consumerConnection); + + try {Thread.sleep(1000); } catch (Exception e) {} + + // here we pass in null for the JMS selector + Consumer consumer = new Consumer(consumerConnection, null); + Thread consumerThread = new Thread(consumer); + + consumerThread.start(); + + producerConnection = createConnection(); + producerConnection.start(); + LOG.info("producerConnection = " +producerConnection); + + try {Thread.sleep(3000); } catch (Exception e) {} + + Producer producer = new Producer(producerConnection); + Thread producerThread = new Thread(producer); + producerThread.start(); + + // now that everything is running, let's wait for the consumer thread to finish ... + consumerThread.join(); + producer.stop = true; + + if (consumer.getCount() == testSize ) + LOG.info("test complete .... all messsages consumed!!"); + else + LOG.info("test failed .... Sent " + (testSize / 1) + + " messages intended to be consumed ( " + testSize + " total), but only consumed " + consumer.getCount()); + + + assertTrue("Sent " + testSize + " messages intended to be consumed, but only consumed " + consumer.getCount(), + (consumer.getCount() == testSize )); + assertFalse("Delivery of messages to consumer was halted during this test", consumer.deliveryHalted()); + + + } + + /** + * Test to check if a single consumer with a JMS selector will receive all intended messages + * + * @throws java.lang.Exception + */ + public void testDiscriminatingConsumer() throws Exception { + + consumerConnection = createConnection(); + consumerConnection.start(); + LOG.info("consumerConnection = " +consumerConnection); + + try {Thread.sleep(1000); } catch (Exception e) {} + + // here we pass the JMS selector we intend to consume + Consumer consumer = new Consumer(consumerConnection, JMSTYPE_EATME); + Thread consumerThread = new Thread(consumer); + + consumerThread.start(); + + producerConnection = createConnection(); + producerConnection.start(); + LOG.info("producerConnection = " +producerConnection); + + try {Thread.sleep(3000); } catch (Exception e) {} + + Producer producer = new Producer(producerConnection); + Thread producerThread = new Thread(producer); + producerThread.start(); + + // now that everything is running, let's wait for the consumer thread to finish ... + consumerThread.join(); + producer.stop = true; + + if (consumer.getCount() == (testSize / 2)) + { + LOG.info("test complete .... all messsages consumed!!"); + } + else + { + LOG.info("test failed .... Sent " + testSize + " original messages, only half of which (" + (testSize / 2) + + ") were intended to be consumed: consumer paused at: " + consumer.getCount()); + //System.out.println("test failed .... Sent " + testSize + " original messages, only half of which (" + (testSize / 2) + + // ") were intended to be consumed: consumer paused at: " + consumer.getCount()); + + } + + assertTrue("Sent " + testSize + " original messages, only half of which (" + (testSize / 2) + + ") were intended to be consumed: consumer paused at: " + consumer.getCount(), + (consumer.getCount() == (testSize / 2))); + assertTrue("Delivery of messages to consumer was halted during this test as it only wants half", consumer.deliveryHalted()); + } + + /** + * Helper class that will publish 2 * testSize messages. The messages will be distributed evenly + * between the following two JMS types: + * + * @see JMSTYPE_INTENDED_FOR_CONSUMPTION + * @see JMSTYPE_NOT_INTENDED_FOR_CONSUMPTION + * + * @author jlyons + * + */ + private class Producer extends Thread + { + private int counterSent = 0; + private Connection connection = null; + public boolean stop = false; + + public Producer(Connection connection) + { + this.connection = connection; + } + + public void run() { + try { + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue("test"); + + // wait for 10 seconds to allow consumer.receive to be run + // first + Thread.sleep(10000); + MessageProducer producer = session.createProducer(queue); + + while (!stop && (counterSent < testSize)) + { + // first send a message intended to be consumed .... + TextMessage message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ... + message.setJMSType(JMSTYPE_EATME); + //LOG.info("sending .... JMSType = " + message.getJMSType()); + producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000); + + counterSent++; + + // now send a message intended to be consumed by some other consumer in the the future + // ... we expect these messages to accrue in the queue + message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ... + message.setJMSType(JMSTYPE_IGNOREME); + //LOG.info("sending .... JMSType = " + message.getJMSType()); + producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000); + + counterSent++; + } + + session.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("producer thread complete ... " + counterSent + " messages sent to the queue"); + } + + public int getCount() + { + return this.counterSent; + } + + } + + /** + * Helper class that will consume messages from the queue based on the supplied JMS selector. + * Thread will stop after the first receive(..) timeout, or once all expected messages have + * been received (see testSize). If the thread stops due to a timeout, it is experiencing the + * delivery pause that is symptomatic of a bug in the broker. + * + * @author jlyons + * + */ + private class Consumer extends Thread + { + protected int counterReceived = 0; + private Connection connection = null; + private String jmsSelector = null; + private boolean deliveryHalted = false; + + public Consumer(Connection connection, String jmsSelector) + { + this.connection = connection; + this.jmsSelector = jmsSelector; + } + + public void run() { + boolean testComplete = false; + try { + Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue("test"); + MessageConsumer consumer = null; + if (null != this.jmsSelector) + { + consumer = session.createConsumer(queue, "JMSType='" + this.jmsSelector + "'"); + } + else + { + consumer = session.createConsumer(queue); + } + + while (!deliveryHalted && (counterReceived < testSize)) + { + TextMessage result = (TextMessage) consumer.receive(30000); + if (result != null) { + counterReceived++; + //System.out.println("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived); + LOG.info("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived); + } else + { + LOG.info("consuming .... timeout while waiting for a message ... broker must have stopped delivery ... received = " + counterReceived); + deliveryHalted = true; + } + } + session.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + public int getCount() + { + return this.counterReceived; + } + + public boolean deliveryHalted() + { + return this.deliveryHalted; + } + } + +}