mirror of
https://github.com/apache/activemq.git
synced 2025-02-16 23:16:52 +00:00
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1490745 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b8518e0b91
commit
0b8b230983
@ -17,8 +17,9 @@
|
|||||||
package org.apache.activemq.broker.region;
|
package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import javax.jms.InvalidSelectorException;
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
import org.apache.activemq.broker.Broker;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.region.group.MessageGroupMap;
|
import org.apache.activemq.broker.region.group.MessageGroupMap;
|
||||||
@ -41,24 +42,28 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
|
|||||||
/**
|
/**
|
||||||
* In the queue case, mark the node as dropped and then a gc cycle will
|
* In the queue case, mark the node as dropped and then a gc cycle will
|
||||||
* remove it from the queue.
|
* remove it from the queue.
|
||||||
*
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
|
protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
|
||||||
final Destination q = (Destination) n.getRegionDestination();
|
final Destination q = (Destination) n.getRegionDestination();
|
||||||
final QueueMessageReference node = (QueueMessageReference)n;
|
final QueueMessageReference node = (QueueMessageReference)n;
|
||||||
final Queue queue = (Queue)q;
|
final Queue queue = (Queue)q;
|
||||||
|
|
||||||
if (n.isExpired()) {
|
if (n.isExpired()) {
|
||||||
// sync with message expiry processing
|
// sync with message expiry processing
|
||||||
if (!broker.isExpired(n)) {
|
if (!broker.isExpired(n)) {
|
||||||
LOG.warn("ignoring ack " + ack + ", for already expired message: " + n);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("ignoring ack {}, for already expired message: {}", ack, n);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
queue.removeMessage(context, this, node, ack);
|
queue.removeMessage(context, this, node, ack);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected boolean canDispatch(MessageReference n) throws IOException {
|
protected boolean canDispatch(MessageReference n) throws IOException {
|
||||||
boolean result = true;
|
boolean result = true;
|
||||||
QueueMessageReference node = (QueueMessageReference)n;
|
QueueMessageReference node = (QueueMessageReference)n;
|
||||||
@ -86,26 +91,31 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized String toString() {
|
public synchronized String toString() {
|
||||||
return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
|
return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
|
||||||
+ this.prefetchExtension + ", pending=" + getPendingQueueSize();
|
+ this.prefetchExtension + ", pending=" + getPendingQueueSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getLockPriority() {
|
public int getLockPriority() {
|
||||||
return info.getPriority();
|
return info.getPriority();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isLockExclusive() {
|
public boolean isLockExclusive() {
|
||||||
return info.isExclusive();
|
return info.isExclusive();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
setSlowConsumer(false);
|
setSlowConsumer(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
protected boolean isDropped(MessageReference node) {
|
protected boolean isDropped(MessageReference node) {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
if(node instanceof IndirectMessageReference) {
|
if(node instanceof IndirectMessageReference) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user