mirror of https://github.com/apache/activemq.git
log when discarding messages - and ensure we don't get npe
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382692 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
057cde18f9
commit
2867249482
|
@ -19,10 +19,8 @@ package org.apache.activemq.broker.region;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
|
||||||
import javax.jms.InvalidSelectorException;
|
import javax.jms.InvalidSelectorException;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
import org.apache.activemq.broker.Broker;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
@ -34,9 +32,11 @@ import org.apache.activemq.command.MessageDispatch;
|
||||||
import org.apache.activemq.command.MessageDispatchNotification;
|
import org.apache.activemq.command.MessageDispatchNotification;
|
||||||
import org.apache.activemq.memory.UsageManager;
|
import org.apache.activemq.memory.UsageManager;
|
||||||
import org.apache.activemq.transaction.Synchronization;
|
import org.apache.activemq.transaction.Synchronization;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
public class TopicSubscription extends AbstractSubscription {
|
public class TopicSubscription extends AbstractSubscription {
|
||||||
|
private static final Log log = LogFactory.getLog(TopicSubscription.class);
|
||||||
final protected LinkedList matched = new LinkedList();
|
final protected LinkedList matched = new LinkedList();
|
||||||
final protected ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
|
final protected ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
|
||||||
final protected UsageManager usageManager;
|
final protected UsageManager usageManager;
|
||||||
|
@ -62,8 +62,9 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
|
|
||||||
// NOTE - be careful about the slaveBroker!
|
// NOTE - be careful about the slaveBroker!
|
||||||
if (maximumPendingMessages > 0) {
|
if (maximumPendingMessages > 0) {
|
||||||
|
log.warn("discarding " + (matched.size() - maximumPendingMessages) + " messages for slow consumer");
|
||||||
// lets discard old messages as we are a slow consumer
|
// lets discard old messages as we are a slow consumer
|
||||||
while (matched.size() > maximumPendingMessages) {
|
while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
|
||||||
MessageReference oldMessage = (MessageReference) matched.removeFirst();
|
MessageReference oldMessage = (MessageReference) matched.removeFirst();
|
||||||
oldMessage.decrementReferenceCount();
|
oldMessage.decrementReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue