reflect need for store to set messageId.setFutureOrSequenceLong for journaled jdbc

This commit is contained in:
gtully 2014-08-30 23:51:59 +01:00
parent 8a37f97315
commit e8f8155141
1 changed files with 9 additions and 4 deletions

View File

@ -34,6 +34,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -88,7 +89,7 @@ public class JournalMessageStore extends AbstractMessageStore {
* Not synchronized since the Journal has better throughput if you increase * Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing. * the number of concurrent writes that it is doing.
*/ */
public void addMessage(ConnectionContext context, final Message message) throws IOException { public void addMessage(final ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId(); final MessageId id = message.getMessageId();
@ -100,7 +101,7 @@ public class JournalMessageStore extends AbstractMessageStore {
if (debug) { if (debug) {
LOG.debug("Journalled message add for: " + id + ", at: " + location); LOG.debug("Journalled message add for: " + id + ", at: " + location);
} }
addMessage(message, location); addMessage(context, message, location);
} else { } else {
if (debug) { if (debug) {
LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
@ -116,7 +117,7 @@ public class JournalMessageStore extends AbstractMessageStore {
} }
synchronized (JournalMessageStore.this) { synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
addMessage(message, location); addMessage(context, message, location);
} }
} }
@ -133,11 +134,15 @@ public class JournalMessageStore extends AbstractMessageStore {
} }
} }
void addMessage(final Message message, final RecordLocation location) { void addMessage(ConnectionContext context, final Message message, final RecordLocation location) {
synchronized (this) { synchronized (this) {
lastLocation = location; lastLocation = location;
MessageId id = message.getMessageId(); MessageId id = message.getMessageId();
messages.put(id, message); messages.put(id, message);
message.getMessageId().setFutureOrSequenceLong(0l);
if (indexListener != null) {
indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
}
} }
} }