git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1403073 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-10-28 18:58:06 +00:00
parent f0b3f1c6e0
commit afe73c1861
2 changed files with 181 additions and 24 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
@ -38,6 +39,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
@ -54,6 +56,7 @@ public class TopicSubscription extends AbstractSubscription {
boolean singleDestination = true;
Destination destination;
private final Scheduler scheduler;
private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
@ -61,6 +64,7 @@ public class TopicSubscription extends AbstractSubscription {
private final Object matchedListMutex = new Object();
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
private final AtomicBoolean prefetchWindowOpen = new AtomicBoolean(false);
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
protected int maxProducersToAudit = 1024;
@ -78,6 +82,8 @@ public class TopicSubscription extends AbstractSubscription {
} else {
this.matched = new FilePendingMessageCursor(broker,matchedName,false);
}
this.scheduler = broker.getScheduler();
}
public void init() throws Exception {
@ -95,7 +101,7 @@ public class TopicSubscription extends AbstractSubscription {
return;
}
enqueueCounter.incrementAndGet();
if (!isFull() && matched.isEmpty() && !isSlave()) {
if (!isFull() && matched.isEmpty() && !isSlave()) {
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
@ -291,10 +297,52 @@ public class TopicSubscription extends AbstractSubscription {
}
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
// not supported for topics
// The slave should not deliver pull messages.
if (getPrefetchSize() == 0 && !isSlave()) {
prefetchWindowOpen.set(true);
dispatchMatched();
// If there was nothing dispatched.. we may need to setup a timeout.
if (prefetchWindowOpen.get()) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
prefetchWindowOpen.set(false);
// Send a NULL message to signal nothing pending.
dispatch(null);
}
if (pull.getTimeout() > 0) {
scheduler.executeAfterDelay(new Runnable() {
public void run() {
pullTimeout();
}
}, pull.getTimeout());
}
}
}
return null;
}
/**
* Occurs when a pull times out. If nothing has been dispatched since the
* timeout was setup, then send the NULL message.
*/
private final void pullTimeout() {
synchronized (matchedListMutex) {
if (prefetchWindowOpen.compareAndSet(true, false)) {
try {
dispatch(null);
} catch (Exception e) {
context.getConnection().serviceException(e);
}
}
}
}
public int getPendingQueueSize() {
return matched();
}
@ -395,7 +443,7 @@ public class TopicSubscription extends AbstractSubscription {
// Implementation methods
// -------------------------------------------------------------------------
public boolean isFull() {
return getDispatchedQueueSize() >= info.getPrefetchSize();
return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get();
}
public int getInFlightSize() {
@ -482,6 +530,7 @@ public class TopicSubscription extends AbstractSubscription {
continue; // just drop it.
}
dispatch(message);
prefetchWindowOpen.set(false);
}
} finally {
matched.release();
@ -492,38 +541,46 @@ public class TopicSubscription extends AbstractSubscription {
private void dispatch(final MessageReference node) throws IOException {
Message message = (Message)node;
node.incrementReferenceCount();
if (node != null) {
node.incrementReferenceCount();
}
// Make sure we can dispatch a message.
MessageDispatch md = new MessageDispatch();
md.setMessage(message);
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
dispatchedCounter.incrementAndGet();
// Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
if (destination == null) {
destination = node.getRegionDestination();
} else {
if (destination != node.getRegionDestination()) {
singleDestination = false;
if (node != null) {
md.setDestination(node.getRegionDestination().getActiveMQDestination());
dispatchedCounter.incrementAndGet();
// Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
if (destination == null) {
destination = node.getRegionDestination();
} else {
if (destination != node.getRegionDestination()) {
singleDestination = false;
}
}
}
}
if (info.isDispatchAsync()) {
md.setTransmitCallback(new Runnable() {
public void run() {
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
}
});
if (node != null) {
md.setTransmitCallback(new Runnable() {
@Override
public void run() {
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
}
});
}
context.getConnection().dispatchAsync(md);
} else {
context.getConnection().dispatchSync(md);
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
if (node != null) {
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
}
}
}

View File

@ -0,0 +1,100 @@
package org.apache.activemq.usecases;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TopicSubscriptionZeroPrefetchTest {
private static final String TOPIC_NAME = "slow.consumer";
private Connection connection;
private Session session;
private ActiveMQTopic destination;
private MessageProducer producer;
private MessageConsumer consumer;
private BrokerService brokerService;
@Before
public void setUp() throws Exception {
brokerService = createBroker();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
activeMQConnectionFactory.setWatchTopicAdvisories(true);
connection = activeMQConnectionFactory.createConnection();
connection.setClientID("ClientID-1");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = new ActiveMQTopic(TOPIC_NAME);
producer = session.createProducer(destination);
connection.start();
}
/*
* test non durable topic subscription with prefetch set to zero
*/
@Test(timeout=60000)
public void testTopicConsumerPrefetchZero() throws Exception {
ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
consumer = session.createConsumer(consumerDestination);
// publish messages
Message txtMessage = session.createTextMessage("M");
producer.send(txtMessage);
Message consumedMessage = consumer.receiveNoWait();
Assert.assertNotNull("should have received a message the published message", consumedMessage);
}
/*
* test durable topic subscription with prefetch zero
*/
@Test(timeout=60000)
public void testDurableTopicConsumerPrefetchZero() throws Exception {
ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.prefetchSize=0");
consumer = session.createDurableSubscriber(consumerDestination, "mysub1");
// publish messages
Message txtMessage = session.createTextMessage("M");
producer.send(txtMessage);
Message consumedMessage = consumer.receive(100);
Assert.assertNotNull("should have received a message the published message", consumedMessage);
}
@After
public void tearDown() throws Exception {
consumer.close();
producer.close();
session.close();
connection.close();
brokerService.stop();
}
// helper method to create a broker with slow consumer advisory turned on
private BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("localhost");
broker.setUseJmx(false);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("vm://localhost");
broker.start();
broker.waitUntilStarted();
return broker;
}
}