diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index c3841c8527..03513aa2a8 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -782,10 +782,11 @@ public abstract class BaseDestination implements Destination { ack.copy(a); ack = a; // Convert to non-ranged. - ack.setFirstMessageId(node.getMessageId()); - ack.setLastMessageId(node.getMessageId()); ack.setMessageCount(1); } + // always use node messageId so we can access entry/data Location + ack.setFirstMessageId(node.getMessageId()); + ack.setLastMessageId(node.getMessageId()); return ack; } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 4badb09680..c3d5594b5f 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -237,7 +237,9 @@ public class JDBCMessageStore extends AbstractMessageStore { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0]; + long seq = ack.getLastMessageId().getEntryLocator() != null ? + (Long) ack.getLastMessageId().getEntryLocator() : + persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0]; // Get a connection and remove the message from the DB TransactionContext c = persistenceAdapter.getTransactionContext(context); @@ -339,6 +341,7 @@ public class JDBCMessageStore extends AbstractMessageStore { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); + msg.getMessageId().setEntryLocator(sequenceId); listener.recoverMessage(msg); lastRecoveredSequenceId.set(sequenceId); lastRecoveredPriority.set(msg.getPriority());