mirror of https://github.com/apache/activemq.git
Fix for excessive memory usage for durable consumers -
see https://issues.apache.org/activemq/browse/AMQ-1490 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@602440 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5628b3e4ba
commit
8c541377bd
|
@ -127,8 +127,8 @@ public class AMQMessageStore implements MessageStore {
|
|||
}
|
||||
synchronized (AMQMessageStore.this) {
|
||||
inFlightTxLocations.remove(location);
|
||||
addMessage(message, location);
|
||||
}
|
||||
addMessage(message, location);
|
||||
}
|
||||
|
||||
public void afterRollback() throws Exception {
|
||||
|
@ -153,10 +153,15 @@ public class AMQMessageStore implements MessageStore {
|
|||
messages.put(message.getMessageId(), data);
|
||||
this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
|
||||
}
|
||||
try {
|
||||
asyncWriteTask.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
if (messages.size() > this.peristenceAdapter
|
||||
.getMaxCheckpointMessageAddSize()) {
|
||||
flush();
|
||||
} else {
|
||||
try {
|
||||
asyncWriteTask.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -233,7 +238,10 @@ public class AMQMessageStore implements MessageStore {
|
|||
messageAcks.add(ack);
|
||||
}
|
||||
}
|
||||
if (data == null) {
|
||||
if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
|
||||
flush();
|
||||
}
|
||||
else if (data == null) {
|
||||
try {
|
||||
asyncWriteTask.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -47,8 +47,6 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
|
||||
private TopicReferenceStore topicReferenceStore;
|
||||
private Map<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
|
||||
|
||||
public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
|
||||
super(adapter, topicReferenceStore, destinationName);
|
||||
this.topicReferenceStore = topicReferenceStore;
|
||||
|
@ -158,12 +156,6 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
|
|||
MessageAck ack = new MessageAck();
|
||||
ack.setLastMessageId(messageId);
|
||||
removeMessage(context, ack);
|
||||
|
||||
}
|
||||
try {
|
||||
asyncWriteTask.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue