This closes #128

This commit is contained in:
Timothy Bish 2015-07-07 19:03:42 -04:00
commit 6f457d2f5c
1 changed files with 10 additions and 2 deletions

View File

@ -53,6 +53,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
this.messageTable = Collections.synchronizedMap(messageTable);
}
@Override
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
synchronized (messageTable) {
messageTable.put(message.getMessageId(), message);
@ -74,6 +75,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
// }
// }
@Override
public Message getMessage(MessageId identity) throws IOException {
return messageTable.get(identity);
}
@ -82,6 +84,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
// return (String)messageTable.get(identity);
// }
@Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
removeMessage(ack.getLastMessageId());
}
@ -91,15 +94,16 @@ public class MemoryMessageStore extends AbstractMessageStore {
Message removed = messageTable.remove(msgId);
if( removed !=null ) {
removed.decrementReferenceCount();
getMessageStoreStatistics().getMessageCount().decrement();
getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
}
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
lastBatchId = null;
}
getMessageStoreStatistics().getMessageCount().decrement();
getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
}
}
@Override
public void recover(MessageRecoveryListener listener) throws Exception {
// the message table is a synchronizedMap - so just have to synchronize
// here
@ -115,6 +119,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
}
}
@Override
public void removeAllMessages(ConnectionContext context) throws IOException {
synchronized (messageTable) {
messageTable.clear();
@ -129,6 +134,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
}
}
@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
@ -151,6 +157,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
}
}
@Override
public void resetBatching() {
lastBatchId = null;
}
@ -160,6 +167,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
lastBatchId = messageId;
}
@Override
public void updateMessage(Message message) {
synchronized (messageTable) {
Message original = messageTable.get(message.getMessageId());