diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java index 08276d3789..2d44769a61 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java @@ -34,6 +34,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -88,7 +89,7 @@ public class JournalMessageStore extends AbstractMessageStore { * Not synchronized since the Journal has better throughput if you increase * the number of concurrent writes that it is doing. */ - public void addMessage(ConnectionContext context, final Message message) throws IOException { + public void addMessage(final ConnectionContext context, final Message message) throws IOException { final MessageId id = message.getMessageId(); @@ -100,7 +101,7 @@ public class JournalMessageStore extends AbstractMessageStore { if (debug) { LOG.debug("Journalled message add for: " + id + ", at: " + location); } - addMessage(message, location); + addMessage(context, message, location); } else { if (debug) { LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); @@ -116,7 +117,7 @@ public class JournalMessageStore extends AbstractMessageStore { } synchronized (JournalMessageStore.this) { inFlightTxLocations.remove(location); - addMessage(message, location); + addMessage(context, message, location); } } @@ -133,11 +134,15 @@ public class JournalMessageStore extends AbstractMessageStore { } } - void addMessage(final Message message, final RecordLocation location) { + void addMessage(ConnectionContext context, final Message message, final RecordLocation location) { synchronized (this) { lastLocation = location; MessageId id = message.getMessageId(); messages.put(id, message); + message.getMessageId().setFutureOrSequenceLong(0l); + if (indexListener != null) { + indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); + } } }