mirror of https://github.com/apache/activemq.git
commit test case with policy maxPageSize workaround in place that demonstrates it works as expected - https://issues.apache.org/activemq/browse/AMQ-2217
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@995381 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
adb76e5eb1
commit
9f0c86cc6d
|
@ -0,0 +1,336 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.JmsConnectionStartStopTest;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
|
||||
/**
|
||||
* Test case intended to demonstrate delivery interruption to queue consumers when
|
||||
* a JMS selector leaves some messages on the queue (due to use of a JMS Selector)
|
||||
*
|
||||
* testNonDiscriminatingConsumer() demonstrates proper functionality for consumers that don't use
|
||||
* a selector to qualify their input.
|
||||
*
|
||||
* testDiscriminatingConsumer() demonstrates the failure condition in which delivery to the consumer
|
||||
* eventually halts.
|
||||
*
|
||||
* The expected behavior is for the delivery to the client to be maintained regardless of the depth
|
||||
* of the queue, particularly when the messages in the queue do not meet the selector criteria of the
|
||||
* client.
|
||||
*
|
||||
* https://issues.apache.org/activemq/browse/AMQ-2217
|
||||
*
|
||||
*/
|
||||
public class DiscriminatingConsumerLoadTest extends TestSupport {
|
||||
|
||||
private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
|
||||
.getLog(DiscriminatingConsumerLoadTest.class);
|
||||
|
||||
private Connection producerConnection;
|
||||
private Connection consumerConnection;
|
||||
private int counterSent = 0;
|
||||
private int counterReceived = 0;
|
||||
|
||||
public static final String JMSTYPE_EATME = "DiscriminatingLoadClient.EatMe";
|
||||
public static final String JMSTYPE_IGNOREME = "DiscriminatingLoadClient.IgnoreMe";
|
||||
|
||||
private int testSize = 5000; // setting this to a small number will pass all tests
|
||||
|
||||
BrokerService broker;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setPersistent(false);
|
||||
|
||||
// workaround is to ensure sufficient dispatch buffer for the destination
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultPolicy = new PolicyEntry();
|
||||
defaultPolicy.setMaxPageSize(testSize);
|
||||
policyMap.setDefaultEntry(defaultPolicy);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
broker.start();
|
||||
|
||||
super.setUp();
|
||||
this.producerConnection = this.createConnection();
|
||||
this.consumerConnection = this.createConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
* @see junit.framework.TestCase#tearDown()
|
||||
*/
|
||||
protected void tearDown() throws Exception {
|
||||
if (producerConnection != null) {
|
||||
producerConnection.close();
|
||||
producerConnection = null;
|
||||
}
|
||||
if (consumerConnection != null) {
|
||||
consumerConnection.close();
|
||||
consumerConnection = null;
|
||||
}
|
||||
super.tearDown();
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check if a single consumer with no JMS selector will receive all intended messages
|
||||
*
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
public void testNonDiscriminatingConsumer() throws Exception {
|
||||
|
||||
consumerConnection = createConnection();
|
||||
consumerConnection.start();
|
||||
LOG.info("consumerConnection = " +consumerConnection);
|
||||
|
||||
try {Thread.sleep(1000); } catch (Exception e) {}
|
||||
|
||||
// here we pass in null for the JMS selector
|
||||
Consumer consumer = new Consumer(consumerConnection, null);
|
||||
Thread consumerThread = new Thread(consumer);
|
||||
|
||||
consumerThread.start();
|
||||
|
||||
producerConnection = createConnection();
|
||||
producerConnection.start();
|
||||
LOG.info("producerConnection = " +producerConnection);
|
||||
|
||||
try {Thread.sleep(3000); } catch (Exception e) {}
|
||||
|
||||
Producer producer = new Producer(producerConnection);
|
||||
Thread producerThread = new Thread(producer);
|
||||
producerThread.start();
|
||||
|
||||
// now that everything is running, let's wait for the consumer thread to finish ...
|
||||
consumerThread.join();
|
||||
producer.stop = true;
|
||||
|
||||
if (consumer.getCount() == testSize )
|
||||
LOG.info("test complete .... all messsages consumed!!");
|
||||
else
|
||||
LOG.info("test failed .... Sent " + (testSize / 1) +
|
||||
" messages intended to be consumed ( " + testSize + " total), but only consumed " + consumer.getCount());
|
||||
|
||||
|
||||
assertTrue("Sent " + testSize + " messages intended to be consumed, but only consumed " + consumer.getCount(),
|
||||
(consumer.getCount() == testSize ));
|
||||
assertFalse("Delivery of messages to consumer was halted during this test", consumer.deliveryHalted());
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check if a single consumer with a JMS selector will receive all intended messages
|
||||
*
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
public void testDiscriminatingConsumer() throws Exception {
|
||||
|
||||
consumerConnection = createConnection();
|
||||
consumerConnection.start();
|
||||
LOG.info("consumerConnection = " +consumerConnection);
|
||||
|
||||
try {Thread.sleep(1000); } catch (Exception e) {}
|
||||
|
||||
// here we pass the JMS selector we intend to consume
|
||||
Consumer consumer = new Consumer(consumerConnection, JMSTYPE_EATME);
|
||||
Thread consumerThread = new Thread(consumer);
|
||||
|
||||
consumerThread.start();
|
||||
|
||||
producerConnection = createConnection();
|
||||
producerConnection.start();
|
||||
LOG.info("producerConnection = " +producerConnection);
|
||||
|
||||
try {Thread.sleep(3000); } catch (Exception e) {}
|
||||
|
||||
Producer producer = new Producer(producerConnection);
|
||||
Thread producerThread = new Thread(producer);
|
||||
producerThread.start();
|
||||
|
||||
// now that everything is running, let's wait for the consumer thread to finish ...
|
||||
consumerThread.join();
|
||||
producer.stop = true;
|
||||
|
||||
if (consumer.getCount() == (testSize / 2))
|
||||
{
|
||||
LOG.info("test complete .... all messsages consumed!!");
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.info("test failed .... Sent " + testSize + " original messages, only half of which (" + (testSize / 2) +
|
||||
") were intended to be consumed: consumer paused at: " + consumer.getCount());
|
||||
//System.out.println("test failed .... Sent " + testSize + " original messages, only half of which (" + (testSize / 2) +
|
||||
// ") were intended to be consumed: consumer paused at: " + consumer.getCount());
|
||||
|
||||
}
|
||||
|
||||
assertTrue("Sent " + testSize + " original messages, only half of which (" + (testSize / 2) +
|
||||
") were intended to be consumed: consumer paused at: " + consumer.getCount(),
|
||||
(consumer.getCount() == (testSize / 2)));
|
||||
assertTrue("Delivery of messages to consumer was halted during this test as it only wants half", consumer.deliveryHalted());
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class that will publish 2 * testSize messages. The messages will be distributed evenly
|
||||
* between the following two JMS types:
|
||||
*
|
||||
* @see JMSTYPE_INTENDED_FOR_CONSUMPTION
|
||||
* @see JMSTYPE_NOT_INTENDED_FOR_CONSUMPTION
|
||||
*
|
||||
* @author jlyons
|
||||
*
|
||||
*/
|
||||
private class Producer extends Thread
|
||||
{
|
||||
private int counterSent = 0;
|
||||
private Connection connection = null;
|
||||
public boolean stop = false;
|
||||
|
||||
public Producer(Connection connection)
|
||||
{
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final Queue queue = session.createQueue("test");
|
||||
|
||||
// wait for 10 seconds to allow consumer.receive to be run
|
||||
// first
|
||||
Thread.sleep(10000);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
while (!stop && (counterSent < testSize))
|
||||
{
|
||||
// first send a message intended to be consumed ....
|
||||
TextMessage message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ...
|
||||
message.setJMSType(JMSTYPE_EATME);
|
||||
//LOG.info("sending .... JMSType = " + message.getJMSType());
|
||||
producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
|
||||
|
||||
counterSent++;
|
||||
|
||||
// now send a message intended to be consumed by some other consumer in the the future
|
||||
// ... we expect these messages to accrue in the queue
|
||||
message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ...
|
||||
message.setJMSType(JMSTYPE_IGNOREME);
|
||||
//LOG.info("sending .... JMSType = " + message.getJMSType());
|
||||
producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
|
||||
|
||||
counterSent++;
|
||||
}
|
||||
|
||||
session.close();
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
LOG.info("producer thread complete ... " + counterSent + " messages sent to the queue");
|
||||
}
|
||||
|
||||
public int getCount()
|
||||
{
|
||||
return this.counterSent;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class that will consume messages from the queue based on the supplied JMS selector.
|
||||
* Thread will stop after the first receive(..) timeout, or once all expected messages have
|
||||
* been received (see testSize). If the thread stops due to a timeout, it is experiencing the
|
||||
* delivery pause that is symptomatic of a bug in the broker.
|
||||
*
|
||||
* @author jlyons
|
||||
*
|
||||
*/
|
||||
private class Consumer extends Thread
|
||||
{
|
||||
protected int counterReceived = 0;
|
||||
private Connection connection = null;
|
||||
private String jmsSelector = null;
|
||||
private boolean deliveryHalted = false;
|
||||
|
||||
public Consumer(Connection connection, String jmsSelector)
|
||||
{
|
||||
this.connection = connection;
|
||||
this.jmsSelector = jmsSelector;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
boolean testComplete = false;
|
||||
try {
|
||||
Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final Queue queue = session.createQueue("test");
|
||||
MessageConsumer consumer = null;
|
||||
if (null != this.jmsSelector)
|
||||
{
|
||||
consumer = session.createConsumer(queue, "JMSType='" + this.jmsSelector + "'");
|
||||
}
|
||||
else
|
||||
{
|
||||
consumer = session.createConsumer(queue);
|
||||
}
|
||||
|
||||
while (!deliveryHalted && (counterReceived < testSize))
|
||||
{
|
||||
TextMessage result = (TextMessage) consumer.receive(30000);
|
||||
if (result != null) {
|
||||
counterReceived++;
|
||||
//System.out.println("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived);
|
||||
LOG.info("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived);
|
||||
} else
|
||||
{
|
||||
LOG.info("consuming .... timeout while waiting for a message ... broker must have stopped delivery ... received = " + counterReceived);
|
||||
deliveryHalted = true;
|
||||
}
|
||||
}
|
||||
session.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public int getCount()
|
||||
{
|
||||
return this.counterReceived;
|
||||
}
|
||||
|
||||
public boolean deliveryHalted()
|
||||
{
|
||||
return this.deliveryHalted;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue