diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index 53c3e6000f..6ae1207b3a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -38,7 +38,7 @@ public interface Destination extends Service { void removeSubscription(ConnectionContext context, Subscription sub) throws Exception; void send(ConnectionContext context, Message messageSend) throws Exception; - boolean lock(MessageReference node, Subscription subscription); + boolean lock(MessageReference node, LockOwner lockOwner); void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException; void gc(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 83f2f04ea8..39b41209e2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -48,7 +48,7 @@ public class IndirectMessageReference implements MessageReference { /** The number of times the message has been delivered.*/ private short redeliveryCounter = 0; /** The subscription that has locked the message */ - private Subscription lockOwner; + private LockOwner lockOwner; /** Has the message been dropped? */ private boolean dropped; /** Has the message been acked? */ @@ -148,7 +148,7 @@ public class IndirectMessageReference implements MessageReference { } } - public boolean lock(Subscription subscription) { + public boolean lock(LockOwner subscription) { if( !regionDestination.lock(this, subscription) ) return false; synchronized (this) { @@ -163,7 +163,7 @@ public class IndirectMessageReference implements MessageReference { lockOwner = null; } - synchronized public Subscription getLockOwner() { + synchronized public LockOwner getLockOwner() { return lockOwner; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/LockOwner.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/LockOwner.java new file mode 100644 index 0000000000..d176c36b89 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/LockOwner.java @@ -0,0 +1,33 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +public interface LockOwner { + + public static final LockOwner HIGH_PRIORITY_LOCK_OWNER = new LockOwner() { + public int getLockPriority() { + return Integer.MAX_VALUE; + } + public boolean isLockExclusive() { + return false; + } + }; + + int getLockPriority(); + boolean isLockExclusive(); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 96fa4c64e3..7be5503213 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -65,7 +65,7 @@ public class Queue implements Destination { protected final UsageManager usageManager; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); - private Subscription exclusiveOwner; + private LockOwner exclusiveOwner; private MessageGroupMap messageGroupOwners; private int messageGroupHashBucketCount = 1024; @@ -106,15 +106,15 @@ public class Queue implements Destination { } } - public synchronized boolean lock(MessageReference node, Subscription sub) { - if (exclusiveOwner == sub) + public synchronized boolean lock(MessageReference node, LockOwner lockOwner) { + if (exclusiveOwner == lockOwner) return true; if (exclusiveOwner != null) return false; - if (sub.getConsumerInfo().getPriority() != highestSubscriptionPriority) + if (lockOwner.getLockPriority() < highestSubscriptionPriority) return false; - if (sub.getConsumerInfo().isExclusive()) { - exclusiveOwner = sub; + if (lockOwner.isLockExclusive()) { + exclusiveOwner = lockOwner; } return true; } @@ -444,13 +444,18 @@ public class Queue implements Destination { try { IndirectMessageReference r = (IndirectMessageReference) iter.next(); if (messageId.equals(r.getMessageId().toString())) { - MessageAck ack = new MessageAck(); - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); - ack.setDestination(destination); - ack.setMessageID(r.getMessageId()); - acknowledge(c, null, ack, r); - r.drop(); - dropEvent(); + + // We should only delete messages that can be locked. + if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) ) { + MessageAck ack = new MessageAck(); + ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + ack.setDestination(destination); + ack.setMessageID(r.getMessageId()); + acknowledge(c, null, ack, r); + r.drop(); + dropEvent(); + iter.remove(); + } } } catch (IOException e) { } @@ -488,13 +493,18 @@ public class Queue implements Destination { for (Iterator iter = messages.iterator(); iter.hasNext();) { try { IndirectMessageReference r = (IndirectMessageReference) iter.next(); - MessageAck ack = new MessageAck(); - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); - ack.setDestination(destination); - ack.setMessageID(r.getMessageId()); - acknowledge(c, null, ack, r); - r.drop(); - dropEvent(); + + // We should only delete messages that can be locked. + if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) ) { + MessageAck ack = new MessageAck(); + ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + ack.setDestination(destination); + ack.setMessageID(r.getMessageId()); + acknowledge(c, null, ack, r); + r.drop(); + dropEvent(); + iter.remove(); + } } catch (IOException e) { } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 61d606f711..02e097cad4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -28,7 +28,7 @@ import javax.jms.InvalidSelectorException; import java.io.IOException; -public class QueueSubscription extends PrefetchSubscription { +public class QueueSubscription extends PrefetchSubscription implements LockOwner { public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { super(broker,context, info); @@ -130,4 +130,12 @@ public class QueueSubscription extends PrefetchSubscription { ", matched="+this.matched.size(); } + public int getLockPriority() { + return info.getPriority(); + } + + public boolean isLockExclusive() { + return info.isExclusive(); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 7c28129610..90c8e732b3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -77,7 +77,7 @@ public class Topic implements Destination { this.destinationStatistics.setParent(parentStats); } - public boolean lock(MessageReference node, Subscription sub) { + public boolean lock(MessageReference node, LockOwner sub) { return true; }