mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3733: Topic subscriber is assumed to be slow consumer when prefetch is set to one. Thanks for the great test case. Fixed up the logic used to determine slowness of a sub to take into account the pending messages and prefetch. It is now only applicable when prefetch > 1 and the pending message strategy keeps messages in memory
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1301565 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eab8758529
commit
1be93984a2
|
@ -99,12 +99,14 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
dispatch(node);
|
||||
setSlowConsumer(false);
|
||||
} else {
|
||||
//we are slow
|
||||
if(!isSlowConsumer()) {
|
||||
LOG.warn(toString() + ": has reached its prefetch limit without an ack, it appears to be slow");
|
||||
setSlowConsumer(true);
|
||||
for (Destination dest: destinations) {
|
||||
dest.slowConsumer(getContext(), this);
|
||||
if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize() ) {
|
||||
//we are slow
|
||||
if(!isSlowConsumer()) {
|
||||
LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
|
||||
setSlowConsumer(true);
|
||||
for (Destination dest: destinations) {
|
||||
dest.slowConsumer(getContext(), this);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (maximumPendingMessages != 0) {
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
package org.apache.activemq.usecases;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.junit.Assert;
|
||||
|
||||
|
||||
/**
|
||||
* Checks to see if "slow consumer advisory messages" are generated when
|
||||
* small number of messages (2) are published to a topic which has a subscriber
|
||||
* with a prefetch of one set.
|
||||
*
|
||||
*/
|
||||
|
||||
public class TopicSubscriptionSlowConsumerTest extends TestCase {
|
||||
|
||||
private static final String TOPIC_NAME = "slow.consumer";
|
||||
Connection connection;
|
||||
private Session session;
|
||||
private ActiveMQTopic destination;
|
||||
private MessageProducer producer;
|
||||
private MessageConsumer consumer;
|
||||
private BrokerService brokerService;
|
||||
|
||||
|
||||
public void setUp() throws Exception {
|
||||
|
||||
brokerService = createBroker();
|
||||
|
||||
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
|
||||
|
||||
activeMQConnectionFactory.setWatchTopicAdvisories(true);
|
||||
connection = activeMQConnectionFactory.createConnection();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
destination = new ActiveMQTopic(TOPIC_NAME);
|
||||
producer = session.createProducer(destination);
|
||||
|
||||
connection.start();
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void testPrefetchValueOne() throws Exception{
|
||||
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME+"?consumer.prefetchSize=1");
|
||||
consumer = session.createConsumer(consumerDestination);
|
||||
|
||||
//add a consumer to the slow consumer advisory topic.
|
||||
ActiveMQTopic slowConsumerAdvisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
|
||||
MessageConsumer slowConsumerAdvisory = session.createConsumer(slowConsumerAdvisoryTopic);
|
||||
|
||||
//publish 2 messages
|
||||
Message txtMessage = session.createTextMessage("Sample Text Message");
|
||||
for(int i= 0; i<2; i++){
|
||||
producer.send(txtMessage);
|
||||
}
|
||||
|
||||
//consume 2 messages
|
||||
for(int i= 0; i<2; i++){
|
||||
Message receivedMsg = consumer.receive(100);
|
||||
Assert.assertNotNull("received msg "+i+" should not be null",receivedMsg);
|
||||
}
|
||||
|
||||
//check for "slow consumer" advisory message
|
||||
Message slowAdvisoryMessage = slowConsumerAdvisory.receive(100);
|
||||
Assert.assertNull( "should not have received a slow consumer advisory message",slowAdvisoryMessage);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void tearDown() throws Exception {
|
||||
consumer.close();
|
||||
producer.close();
|
||||
session.close();
|
||||
connection.close();
|
||||
brokerService.stop();
|
||||
}
|
||||
|
||||
|
||||
//helper method to create a broker with slow consumer advisory turned on
|
||||
private BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setBrokerName("localhost");
|
||||
broker.setUseJmx(true);
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.addConnector("vm://localhost");
|
||||
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
defaultEntry.setAdvisoryForSlowConsumers(true);
|
||||
|
||||
policyMap.setDefaultEntry(defaultEntry);
|
||||
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
return broker;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue