diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index d1605e2b9e..e9f218038b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -797,6 +797,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index CursorAddSync(MessageContext messageContext) { this.messageContext = messageContext; + this.messageContext.message.incrementReferenceCount(); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java index 66902dcc9b..2c91e915fa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java @@ -25,9 +25,9 @@ import org.apache.activemq.command.Message; public interface IndexListener { final class MessageContext { - public Message message; - public ConnectionContext context; - public Runnable onCompletion; + public final Message message; + public final ConnectionContext context; + public final Runnable onCompletion; public boolean duplicate; public MessageContext(ConnectionContext context, Message message, Runnable onCompletion) { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 34bcc6aef0..d40d9475f7 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -330,8 +330,13 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { val entry = QueueEntryRecord(id, queueKey, queueSeq) assert(id.getEntryLocator == null) id.setEntryLocator(EntryLocator(queueKey, queueSeq)) - id.setFutureOrSequenceLong(countDownFuture) - countDownFuture.id = id + if (message.getTransactionId!=null) { + // why does future not get set in tx? + id.setFutureOrSequenceLong(queueSeq) + } else { + id.setFutureOrSequenceLong(countDownFuture) + countDownFuture.id = id + } val a = this.synchronized { if( !delay ) diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 6e3faffe07..451bc04643 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -684,7 +684,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P messageContext.message.decrementReferenceCount() }) val future = uow.enqueue(key, seq, messageContext.message, delay) - messageContext.message.getMessageId.setFutureOrSequenceLong(future) if (indexListener != null) { indexListener.onAdd(messageContext) }