diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 1550436577..9a213358e3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -22,8 +22,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -36,7 +34,6 @@ import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicReferenceStore; -import org.apache.activemq.util.SubscriptionKey; public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore { @@ -143,10 +140,11 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic ConsumerMessageRef ref = null; if((ref = container.remove(messageId)) != null) { StoreEntry entry = ref.getAckEntry(); + //ensure we get up to-date pointers + entry = ackContainer.refresh(entry); TopicSubAck tsa = ackContainer.get(entry); if (tsa != null) { if (tsa.decrementCount() <= 0) { - entry = ackContainer.refresh(entry); ackContainer.remove(entry); ReferenceRecord rr = messageContainer.get(messageId); if (rr != null) {