From 8c541377bdba9baaf06779e0d8095727a7628f1a Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Sat, 8 Dec 2007 10:03:26 +0000 Subject: [PATCH] Fix for excessive memory usage for durable consumers - see https://issues.apache.org/activemq/browse/AMQ-1490 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@602440 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/store/amq/AMQMessageStore.java | 20 +++++++++++++------ .../store/amq/AMQTopicMessageStore.java | 8 -------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index 72fa3a291c..bc57769c3f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -127,8 +127,8 @@ public class AMQMessageStore implements MessageStore { } synchronized (AMQMessageStore.this) { inFlightTxLocations.remove(location); - addMessage(message, location); } + addMessage(message, location); } public void afterRollback() throws Exception { @@ -153,10 +153,15 @@ public class AMQMessageStore implements MessageStore { messages.put(message.getMessageId(), data); this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId()); } - try { - asyncWriteTask.wakeup(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); + if (messages.size() > this.peristenceAdapter + .getMaxCheckpointMessageAddSize()) { + flush(); + } else { + try { + asyncWriteTask.wakeup(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } } } @@ -233,7 +238,10 @@ public class AMQMessageStore implements MessageStore { messageAcks.add(ack); } } - if (data == null) { + if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) { + flush(); + } + else if (data == null) { try { asyncWriteTask.wakeup(); } catch (InterruptedException e) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java index 2d40489fa7..556edca07c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java @@ -47,8 +47,6 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class); private TopicReferenceStore topicReferenceStore; - private Map ackedLastAckLocations = new HashMap(); - public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { super(adapter, topicReferenceStore, destinationName); this.topicReferenceStore = topicReferenceStore; @@ -158,12 +156,6 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag MessageAck ack = new MessageAck(); ack.setLastMessageId(messageId); removeMessage(context, ack); - - } - try { - asyncWriteTask.wakeup(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); } }