- Improved the delete and purge operations on a queue.

- Messages now removed from the message list: fixes message count < 0 problem
  - Messages are now locked before they are deleted, if messages are in flight to a consumer, we don't want to delete them.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382919 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-03 20:07:07 +00:00
parent 9c5680c9f6
commit 44b00e9309
6 changed files with 77 additions and 26 deletions

View File

@ -38,7 +38,7 @@ public interface Destination extends Service {
void removeSubscription(ConnectionContext context, Subscription sub) throws Exception; void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
void send(ConnectionContext context, Message messageSend) throws Exception; void send(ConnectionContext context, Message messageSend) throws Exception;
boolean lock(MessageReference node, Subscription subscription); boolean lock(MessageReference node, LockOwner lockOwner);
void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException; void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
void gc(); void gc();

View File

@ -48,7 +48,7 @@ public class IndirectMessageReference implements MessageReference {
/** The number of times the message has been delivered.*/ /** The number of times the message has been delivered.*/
private short redeliveryCounter = 0; private short redeliveryCounter = 0;
/** The subscription that has locked the message */ /** The subscription that has locked the message */
private Subscription lockOwner; private LockOwner lockOwner;
/** Has the message been dropped? */ /** Has the message been dropped? */
private boolean dropped; private boolean dropped;
/** Has the message been acked? */ /** Has the message been acked? */
@ -148,7 +148,7 @@ public class IndirectMessageReference implements MessageReference {
} }
} }
public boolean lock(Subscription subscription) { public boolean lock(LockOwner subscription) {
if( !regionDestination.lock(this, subscription) ) if( !regionDestination.lock(this, subscription) )
return false; return false;
synchronized (this) { synchronized (this) {
@ -163,7 +163,7 @@ public class IndirectMessageReference implements MessageReference {
lockOwner = null; lockOwner = null;
} }
synchronized public Subscription getLockOwner() { synchronized public LockOwner getLockOwner() {
return lockOwner; return lockOwner;
} }

View File

@ -0,0 +1,33 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
public interface LockOwner {
public static final LockOwner HIGH_PRIORITY_LOCK_OWNER = new LockOwner() {
public int getLockPriority() {
return Integer.MAX_VALUE;
}
public boolean isLockExclusive() {
return false;
}
};
int getLockPriority();
boolean isLockExclusive();
}

View File

@ -65,7 +65,7 @@ public class Queue implements Destination {
protected final UsageManager usageManager; protected final UsageManager usageManager;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private Subscription exclusiveOwner; private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
private int messageGroupHashBucketCount = 1024; private int messageGroupHashBucketCount = 1024;
@ -106,15 +106,15 @@ public class Queue implements Destination {
} }
} }
public synchronized boolean lock(MessageReference node, Subscription sub) { public synchronized boolean lock(MessageReference node, LockOwner lockOwner) {
if (exclusiveOwner == sub) if (exclusiveOwner == lockOwner)
return true; return true;
if (exclusiveOwner != null) if (exclusiveOwner != null)
return false; return false;
if (sub.getConsumerInfo().getPriority() != highestSubscriptionPriority) if (lockOwner.getLockPriority() < highestSubscriptionPriority)
return false; return false;
if (sub.getConsumerInfo().isExclusive()) { if (lockOwner.isLockExclusive()) {
exclusiveOwner = sub; exclusiveOwner = lockOwner;
} }
return true; return true;
} }
@ -444,13 +444,18 @@ public class Queue implements Destination {
try { try {
IndirectMessageReference r = (IndirectMessageReference) iter.next(); IndirectMessageReference r = (IndirectMessageReference) iter.next();
if (messageId.equals(r.getMessageId().toString())) { if (messageId.equals(r.getMessageId().toString())) {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE); // We should only delete messages that can be locked.
ack.setDestination(destination); if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) ) {
ack.setMessageID(r.getMessageId()); MessageAck ack = new MessageAck();
acknowledge(c, null, ack, r); ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
r.drop(); ack.setDestination(destination);
dropEvent(); ack.setMessageID(r.getMessageId());
acknowledge(c, null, ack, r);
r.drop();
dropEvent();
iter.remove();
}
} }
} catch (IOException e) { } catch (IOException e) {
} }
@ -488,13 +493,18 @@ public class Queue implements Destination {
for (Iterator iter = messages.iterator(); iter.hasNext();) { for (Iterator iter = messages.iterator(); iter.hasNext();) {
try { try {
IndirectMessageReference r = (IndirectMessageReference) iter.next(); IndirectMessageReference r = (IndirectMessageReference) iter.next();
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE); // We should only delete messages that can be locked.
ack.setDestination(destination); if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) ) {
ack.setMessageID(r.getMessageId()); MessageAck ack = new MessageAck();
acknowledge(c, null, ack, r); ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
r.drop(); ack.setDestination(destination);
dropEvent(); ack.setMessageID(r.getMessageId());
acknowledge(c, null, ack, r);
r.drop();
dropEvent();
iter.remove();
}
} catch (IOException e) { } catch (IOException e) {
} }
} }

View File

@ -28,7 +28,7 @@ import javax.jms.InvalidSelectorException;
import java.io.IOException; import java.io.IOException;
public class QueueSubscription extends PrefetchSubscription { public class QueueSubscription extends PrefetchSubscription implements LockOwner {
public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info); super(broker,context, info);
@ -130,4 +130,12 @@ public class QueueSubscription extends PrefetchSubscription {
", matched="+this.matched.size(); ", matched="+this.matched.size();
} }
public int getLockPriority() {
return info.getPriority();
}
public boolean isLockExclusive() {
return info.isExclusive();
}
} }

View File

@ -77,7 +77,7 @@ public class Topic implements Destination {
this.destinationStatistics.setParent(parentStats); this.destinationStatistics.setParent(parentStats);
} }
public boolean lock(MessageReference node, Subscription sub) { public boolean lock(MessageReference node, LockOwner sub) {
return true; return true;
} }