https://issues.apache.org/jira/browse/AMQ-5718 - don't add messages to subscriber while it's discarding

This commit is contained in:
Dejan Bosanac 2015-04-13 11:16:35 +02:00
parent b29eb384b8
commit 2562cf21a2
2 changed files with 47 additions and 14 deletions

View File

@ -73,6 +73,7 @@ public class TopicSubscription extends AbstractSubscription {
protected boolean enableAudit = false; protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit; protected ActiveMQMessageAudit audit;
protected boolean active = false; protected boolean active = false;
protected boolean discarding = false;
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info); super(broker, context, info);
@ -107,6 +108,11 @@ public class TopicSubscription extends AbstractSubscription {
node = new IndirectMessageReference(node.getMessage()); node = new IndirectMessageReference(node.getMessage());
enqueueCounter.incrementAndGet(); enqueueCounter.incrementAndGet();
synchronized (matchedListMutex) { synchronized (matchedListMutex) {
// if this subscriber is already discarding a message, we don't want to add
// any more messages to it as those messages can only be advisories generated in the process,
// which can trigger the recursive call loop
if (discarding) return;
if (!isFull() && matched.isEmpty()) { if (!isFull() && matched.isEmpty()) {
// if maximumPendingMessages is set we will only discard messages which // if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled) // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@ -639,18 +645,23 @@ public class TopicSubscription extends AbstractSubscription {
} }
private void discard(MessageReference message) { private void discard(MessageReference message) {
message.decrementReferenceCount(); discarding = true;
matched.remove(message); try {
discarded++; message.decrementReferenceCount();
if(destination != null) { matched.remove(message);
destination.getDestinationStatistics().getDequeues().increment(); discarded++;
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
}
LOG.debug("{}, discarding message {}", this, message);
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {
dest.messageDiscarded(getContext(), this, message);
}
broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
} finally {
discarding = false;
} }
LOG.debug("{}, discarding message {}", this, message);
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {
dest.messageDiscarded(getContext(), this, message);
}
broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
} }
@Override @Override

View File

@ -33,11 +33,11 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.*;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
/** /**
* *
@ -161,6 +161,28 @@ public class AdvisoryTests extends TestCase {
assertNotNull(msg); assertNotNull(msg);
} }
public void testMessageDLQd() throws Exception {
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setTopicPrefetch(2);
((ActiveMQConnection)connection).setPrefetchPolicy(policy);
Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName());
Topic advisoryTopic = s.createTopic(">");
for (int i = 0; i < 100; i++) {
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
}
MessageProducer producer = s.createProducer(topic);
int count = 10;
for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage();
producer.send(m);
}
// we should get here without StackOverflow
}
public void xtestMessageDiscardedAdvisory() throws Exception { public void xtestMessageDiscardedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName()); Topic topic = s.createTopic(getClass().getName());