mirror of https://github.com/apache/activemq.git
AMQ-5748 - Cleanup and clearing cache on shutdown
This commit is contained in:
parent
a49d46e3ca
commit
c6542a921b
|
@ -31,6 +31,7 @@ import org.apache.activemq.command.MessageId;
|
|||
import org.apache.activemq.store.IndexListener;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.store.AbstractMessageStore;
|
||||
import org.apache.activemq.store.MessageStoreStatistics;
|
||||
|
||||
/**
|
||||
* An implementation of {@link org.apache.activemq.store.MessageStore} which
|
||||
|
@ -57,7 +58,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
|
|||
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
|
||||
synchronized (messageTable) {
|
||||
messageTable.put(message.getMessageId(), message);
|
||||
incMessageStoreStatistics(message);
|
||||
incMessageStoreStatistics(getMessageStoreStatistics(), message);
|
||||
}
|
||||
message.incrementReferenceCount();
|
||||
message.getMessageId().setFutureOrSequenceLong(sequenceId++);
|
||||
|
@ -93,7 +94,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
|
|||
Message removed = messageTable.remove(msgId);
|
||||
if( removed !=null ) {
|
||||
removed.decrementReferenceCount();
|
||||
decMessageStoreStatistics(removed);
|
||||
decMessageStoreStatistics(getMessageStoreStatistics(), removed);
|
||||
}
|
||||
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
|
||||
lastBatchId = null;
|
||||
|
@ -198,14 +199,18 @@ public class MemoryMessageStore extends AbstractMessageStore {
|
|||
}
|
||||
}
|
||||
|
||||
protected final void incMessageStoreStatistics(Message message) {
|
||||
getMessageStoreStatistics().getMessageCount().increment();
|
||||
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
|
||||
protected static final void incMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) {
|
||||
if (stats != null && message != null) {
|
||||
stats.getMessageCount().increment();
|
||||
stats.getMessageSize().addSize(message.getSize());
|
||||
}
|
||||
}
|
||||
|
||||
protected final void decMessageStoreStatistics(Message message) {
|
||||
getMessageStoreStatistics().getMessageCount().decrement();
|
||||
getMessageStoreStatistics().getMessageSize().addSize(-message.getSize());
|
||||
protected static final void decMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) {
|
||||
if (stats != null && message != null) {
|
||||
stats.getMessageCount().decrement();
|
||||
stats.getMessageSize().addSize(-message.getSize());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -182,10 +182,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
|
|||
|
||||
@Override
|
||||
protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
|
||||
if (messageStoreStatistics != null) {
|
||||
messageStoreStatistics.getMessageCount().decrement();
|
||||
messageStoreStatistics.getMessageSize().addSize(-eldest.getValue().getSize());
|
||||
}
|
||||
decMessageStoreStatistics(messageStoreStatistics, eldest.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -446,6 +446,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
checkpointThread.join();
|
||||
}
|
||||
}
|
||||
//clear the cache on shutdown of the store
|
||||
storeCache.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue