This commit is contained in:
Dejan Bosanac 2014-04-16 15:44:02 +02:00
parent 7646526c0a
commit e947927511
5 changed files with 75 additions and 56 deletions

View File

@ -109,7 +109,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
protected PendingMessageCursor messages; protected PendingMessageCursor messages;
private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>(); private final PendingList pagedInMessages = new OrderedPendingList();
// Messages that are paged in but have not yet been targeted at a subscription // Messages that are paged in but have not yet been targeted at a subscription
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
protected PendingList pagedInPendingDispatch = new OrderedPendingList(); protected PendingList pagedInPendingDispatch = new OrderedPendingList();
@ -1188,44 +1188,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
pageInMessages(!memoryUsage.isFull(110)); pageInMessages(!memoryUsage.isFull(110));
}; };
List<MessageReference> toExpire = new ArrayList<MessageReference>(); doBrowseList(browseList, max, pagedInPendingDispatch, pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch");
doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages");
pagedInPendingDispatchLock.writeLock().lock();
try {
addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
for (MessageReference ref : toExpire) {
pagedInPendingDispatch.remove(ref);
if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInPending: {}", ref);
messageExpired(connectionContext, ref);
} else {
ref.decrementReferenceCount();
}
}
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
toExpire.clear();
pagedInMessagesLock.readLock().lock();
try {
addAll(pagedInMessages.values(), browseList, max, toExpire);
} finally {
pagedInMessagesLock.readLock().unlock();
}
for (MessageReference ref : toExpire) {
if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInMessages: {}", ref);
messageExpired(connectionContext, ref);
} else {
pagedInMessagesLock.writeLock().lock();
try {
pagedInMessages.remove(ref.getMessageId());
} finally {
pagedInMessagesLock.writeLock().unlock();
}
ref.decrementReferenceCount();
}
}
// we need a store iterator to walk messages on disk, independent of the cursor which is tracking // we need a store iterator to walk messages on disk, independent of the cursor which is tracking
// the next message batch // the next message batch
@ -1234,6 +1198,30 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
protected void doBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name) throws Exception {
List<MessageReference> toExpire = new ArrayList<MessageReference>();
lock.readLock().lock();
try {
addAll(list.values(), browseList, max, toExpire);
} finally {
lock.readLock().unlock();
}
for (MessageReference ref : toExpire) {
if (broker.isExpired(ref)) {
LOG.debug("expiring from {}: {}", name, ref);
messageExpired(connectionContext, ref);
} else {
lock.writeLock().lock();
try {
list.remove(ref);
} finally {
lock.writeLock().unlock();
}
ref.decrementReferenceCount();
}
}
}
private boolean shouldPageInMoreForBrowse(int max) { private boolean shouldPageInMoreForBrowse(int max) {
int alreadyPagedIn = 0; int alreadyPagedIn = 0;
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();
@ -1264,7 +1252,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
MessageId msgId = new MessageId(id); MessageId msgId = new MessageId(id);
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();
try { try {
QueueMessageReference ref = this.pagedInMessages.get(msgId); QueueMessageReference ref = (QueueMessageReference)this.pagedInMessages.get(msgId);
if (ref != null) { if (ref != null) {
return ref; return ref;
} }
@ -1535,7 +1523,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception { ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0; int movedCounter = 0;
Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>(); Set<MessageReference> set = new LinkedHashSet<MessageReference>();
do { do {
doPageIn(true); doPageIn(true);
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();
@ -1544,11 +1532,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally { } finally {
pagedInMessagesLock.readLock().unlock(); pagedInMessagesLock.readLock().unlock();
} }
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set); List<MessageReference> list = new ArrayList<MessageReference>(set);
for (QueueMessageReference ref : list) { for (MessageReference ref : list) {
if (filter.evaluate(context, ref)) { if (filter.evaluate(context, ref)) {
// We should only move messages that can be locked. // We should only move messages that can be locked.
moveMessageTo(context, ref, dest); moveMessageTo(context, (QueueMessageReference)ref, dest);
set.remove(ref); set.remove(ref);
if (++movedCounter >= maximumMessages && maximumMessages > 0) { if (++movedCounter >= maximumMessages && maximumMessages > 0) {
return movedCounter; return movedCounter;
@ -1564,7 +1552,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
throw new Exception("Retry of message is only possible on Dead Letter Queues!"); throw new Exception("Retry of message is only possible on Dead Letter Queues!");
} }
int restoredCounter = 0; int restoredCounter = 0;
Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>(); Set<MessageReference> set = new LinkedHashSet<MessageReference>();
do { do {
doPageIn(true); doPageIn(true);
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();
@ -1573,11 +1561,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally { } finally {
pagedInMessagesLock.readLock().unlock(); pagedInMessagesLock.readLock().unlock();
} }
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set); List<MessageReference> list = new ArrayList<MessageReference>(set);
for (QueueMessageReference ref : list) { for (MessageReference ref : list) {
if (ref.getMessage().getOriginalDestination() != null) { if (ref.getMessage().getOriginalDestination() != null) {
moveMessageTo(context, ref, ref.getMessage().getOriginalDestination()); moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination());
set.remove(ref); set.remove(ref);
if (++restoredCounter >= maximumMessages && maximumMessages > 0) { if (++restoredCounter >= maximumMessages && maximumMessages > 0) {
return restoredCounter; return restoredCounter;
@ -1672,10 +1660,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
if (hasBrowsers) { if (hasBrowsers) {
ArrayList<QueueMessageReference> alreadyDispatchedMessages = null; ArrayList<MessageReference> alreadyDispatchedMessages = null;
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();
try{ try{
alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values()); alreadyDispatchedMessages = new ArrayList<MessageReference>(pagedInMessages.values());
}finally { }finally {
pagedInMessagesLock.readLock().unlock(); pagedInMessagesLock.readLock().unlock();
} }
@ -1691,8 +1679,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size());
boolean added = false; boolean added = false;
for (QueueMessageReference node : alreadyDispatchedMessages) { for (MessageReference node : alreadyDispatchedMessages) {
if (!node.isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
msgContext.setMessageReference(node); msgContext.setMessageReference(node);
if (browser.matches(node, msgContext)) { if (browser.matches(node, msgContext)) {
browser.add(node); browser.add(node);
@ -1830,7 +1818,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();
pagedInMessagesLock.writeLock().lock(); pagedInMessagesLock.writeLock().lock();
try { try {
pagedInMessages.remove(reference.getMessageId()); pagedInMessages.remove(reference);
} finally { } finally {
pagedInMessagesLock.writeLock().unlock(); pagedInMessagesLock.writeLock().unlock();
} }
@ -1996,8 +1984,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
resultList = new OrderedPendingList(); resultList = new OrderedPendingList();
} }
for (QueueMessageReference ref : result) { for (QueueMessageReference ref : result) {
if (!pagedInMessages.containsKey(ref.getMessageId())) { if (!pagedInMessages.contains(ref)) {
pagedInMessages.put(ref.getMessageId(), ref); pagedInMessages.addMessageLast(ref);
resultList.addMessageLast(ref); resultList.addMessageLast(ref);
} else { } else {
ref.decrementReferenceCount(); ref.decrementReferenceCount();
@ -2260,7 +2248,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (message == null) { if (message == null) {
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();
try { try {
message = pagedInMessages.get(messageId); message = (QueueMessageReference)pagedInMessages.get(messageId);
} finally { } finally {
pagedInMessagesLock.readLock().unlock(); pagedInMessagesLock.readLock().unlock();
} }

View File

@ -164,4 +164,13 @@ public class OrderedPendingList implements PendingList {
} }
} }
} }
@Override
public MessageReference get(MessageId messageId) {
PendingNode node = map.get(messageId);
if (node != null) {
return node.getMessage();
}
return null;
}
} }

View File

@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
public interface PendingList extends Iterable<MessageReference> { public interface PendingList extends Iterable<MessageReference> {
@ -108,4 +109,6 @@ public interface PendingList extends Iterable<MessageReference> {
* The PendingList that is to be added to this collection. * The PendingList that is to be added to this collection.
*/ */
public void addAll(PendingList pendingList); public void addAll(PendingList pendingList);
public MessageReference get(MessageId messageId);
} }

View File

@ -156,4 +156,13 @@ public class PrioritizedPendingList implements PendingList {
} }
} }
@Override
public MessageReference get(MessageId messageId) {
PendingNode node = map.get(messageId);
if (node != null) {
return node.getMessage();
}
return null;
}
} }

View File

@ -329,6 +329,16 @@ public class OrderPendingListTest {
theList.add(messageReference); theList.add(messageReference);
} }
} }
@Override
public MessageReference get(MessageId messageId) {
for(MessageReference messageReference : theList) {
if (messageReference.getMessageId().equals(messageId)) {
return messageReference;
}
}
return null;
}
} }
static class TestMessageReference implements MessageReference { static class TestMessageReference implements MessageReference {