diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 0d771caafa..44c85627c8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -77,7 +77,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i if (!isDuplicate(message.getMessageId())) { if (!cached) { message.setRegionDestination(regionDestination); - message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); + if( message.getMemoryUsage()==null ) { + message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); + } } message.incrementReferenceCount(); batchList.put(message.getMessageId(), message); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 059e8678ed..e87567a391 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -58,6 +58,7 @@ public class MemoryMessageStore implements MessageStore { synchronized (messageTable) { messageTable.put(message.getMessageId(), message); } + message.incrementReferenceCount(); } // public void addMessageReference(ConnectionContext context,MessageId @@ -82,7 +83,10 @@ public class MemoryMessageStore implements MessageStore { public void removeMessage(MessageId msgId) throws IOException { synchronized (messageTable) { - messageTable.remove(msgId); + Message removed = messageTable.remove(msgId); + if( removed !=null ) { + removed.decrementReferenceCount(); + } if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { lastBatchId = null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java index 13a3a24f8b..c38cea2bf3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java @@ -34,14 +34,23 @@ class MemoryTopicSub { private Map map = new LinkedHashMap(); private MessageId lastBatch; - synchronized void addMessage(MessageId id, Message message) { - map.put(id, message); + void addMessage(MessageId id, Message message) { + synchronized(this) { + map.put(id, message); + } + message.incrementReferenceCount(); } - synchronized void removeMessage(MessageId id) { - map.remove(id); - if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) { - resetBatching(); + void removeMessage(MessageId id) { + Message removed; + synchronized(this) { + removed = map.remove(id); + if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) { + resetBatching(); + } + } + if( removed!=null ) { + removed.decrementReferenceCount(); } }