mirror of https://github.com/apache/activemq.git
Do better reference count management of persistent messages when they are stored in a Memory store.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@644447 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d35f83bcf5
commit
ce1d85d1ee
|
@ -77,7 +77,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
if (!isDuplicate(message.getMessageId())) {
|
if (!isDuplicate(message.getMessageId())) {
|
||||||
if (!cached) {
|
if (!cached) {
|
||||||
message.setRegionDestination(regionDestination);
|
message.setRegionDestination(regionDestination);
|
||||||
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
|
if( message.getMemoryUsage()==null ) {
|
||||||
|
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
message.incrementReferenceCount();
|
message.incrementReferenceCount();
|
||||||
batchList.put(message.getMessageId(), message);
|
batchList.put(message.getMessageId(), message);
|
||||||
|
|
|
@ -58,6 +58,7 @@ public class MemoryMessageStore implements MessageStore {
|
||||||
synchronized (messageTable) {
|
synchronized (messageTable) {
|
||||||
messageTable.put(message.getMessageId(), message);
|
messageTable.put(message.getMessageId(), message);
|
||||||
}
|
}
|
||||||
|
message.incrementReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
// public void addMessageReference(ConnectionContext context,MessageId
|
// public void addMessageReference(ConnectionContext context,MessageId
|
||||||
|
@ -82,7 +83,10 @@ public class MemoryMessageStore implements MessageStore {
|
||||||
|
|
||||||
public void removeMessage(MessageId msgId) throws IOException {
|
public void removeMessage(MessageId msgId) throws IOException {
|
||||||
synchronized (messageTable) {
|
synchronized (messageTable) {
|
||||||
messageTable.remove(msgId);
|
Message removed = messageTable.remove(msgId);
|
||||||
|
if( removed !=null ) {
|
||||||
|
removed.decrementReferenceCount();
|
||||||
|
}
|
||||||
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
|
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
|
||||||
lastBatchId = null;
|
lastBatchId = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,14 +34,23 @@ class MemoryTopicSub {
|
||||||
private Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
|
private Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
|
||||||
private MessageId lastBatch;
|
private MessageId lastBatch;
|
||||||
|
|
||||||
synchronized void addMessage(MessageId id, Message message) {
|
void addMessage(MessageId id, Message message) {
|
||||||
map.put(id, message);
|
synchronized(this) {
|
||||||
|
map.put(id, message);
|
||||||
|
}
|
||||||
|
message.incrementReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void removeMessage(MessageId id) {
|
void removeMessage(MessageId id) {
|
||||||
map.remove(id);
|
Message removed;
|
||||||
if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
|
synchronized(this) {
|
||||||
resetBatching();
|
removed = map.remove(id);
|
||||||
|
if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
|
||||||
|
resetBatching();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if( removed!=null ) {
|
||||||
|
removed.decrementReferenceCount();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue