mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5277 - jdbc store make use of entryLocator on ack
This commit is contained in:
parent
a9b8d98ed7
commit
3b5d89a78b
|
@ -782,10 +782,11 @@ public abstract class BaseDestination implements Destination {
|
||||||
ack.copy(a);
|
ack.copy(a);
|
||||||
ack = a;
|
ack = a;
|
||||||
// Convert to non-ranged.
|
// Convert to non-ranged.
|
||||||
ack.setFirstMessageId(node.getMessageId());
|
|
||||||
ack.setLastMessageId(node.getMessageId());
|
|
||||||
ack.setMessageCount(1);
|
ack.setMessageCount(1);
|
||||||
}
|
}
|
||||||
|
// always use node messageId so we can access entry/data Location
|
||||||
|
ack.setFirstMessageId(node.getMessageId());
|
||||||
|
ack.setLastMessageId(node.getMessageId());
|
||||||
return ack;
|
return ack;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -237,7 +237,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
|
|
||||||
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
|
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
|
||||||
|
|
||||||
long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
|
long seq = ack.getLastMessageId().getEntryLocator() != null ?
|
||||||
|
(Long) ack.getLastMessageId().getEntryLocator() :
|
||||||
|
persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
|
||||||
|
|
||||||
// Get a connection and remove the message from the DB
|
// Get a connection and remove the message from the DB
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
||||||
|
@ -339,6 +341,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
|
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
|
||||||
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
|
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
|
||||||
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
||||||
|
msg.getMessageId().setEntryLocator(sequenceId);
|
||||||
listener.recoverMessage(msg);
|
listener.recoverMessage(msg);
|
||||||
lastRecoveredSequenceId.set(sequenceId);
|
lastRecoveredSequenceId.set(sequenceId);
|
||||||
lastRecoveredPriority.set(msg.getPriority());
|
lastRecoveredPriority.set(msg.getPriority());
|
||||||
|
|
Loading…
Reference in New Issue