AMQ4677Test.testSendAndReceiveAllMessages - demoed the lack of reference increment for transacted send and the non completion of transacted futures in leveldb

This commit is contained in:
gtully 2014-08-30 23:36:05 +01:00
parent 862f50355f
commit 8a37f97315
4 changed files with 11 additions and 6 deletions

View File

@ -797,6 +797,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
CursorAddSync(MessageContext messageContext) {
this.messageContext = messageContext;
this.messageContext.message.incrementReferenceCount();
}
@Override

View File

@ -25,9 +25,9 @@ import org.apache.activemq.command.Message;
public interface IndexListener {
final class MessageContext {
public Message message;
public ConnectionContext context;
public Runnable onCompletion;
public final Message message;
public final ConnectionContext context;
public final Runnable onCompletion;
public boolean duplicate;
public MessageContext(ConnectionContext context, Message message, Runnable onCompletion) {

View File

@ -330,8 +330,13 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val entry = QueueEntryRecord(id, queueKey, queueSeq)
assert(id.getEntryLocator == null)
id.setEntryLocator(EntryLocator(queueKey, queueSeq))
id.setFutureOrSequenceLong(countDownFuture)
countDownFuture.id = id
if (message.getTransactionId!=null) {
// why does future not get set in tx?
id.setFutureOrSequenceLong(queueSeq)
} else {
id.setFutureOrSequenceLong(countDownFuture)
countDownFuture.id = id
}
val a = this.synchronized {
if( !delay )

View File

@ -684,7 +684,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
messageContext.message.decrementReferenceCount()
})
val future = uow.enqueue(key, seq, messageContext.message, delay)
messageContext.message.getMessageId.setFutureOrSequenceLong(future)
if (indexListener != null) {
indexListener.onAdd(messageContext)
}