mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@667756 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2e4c688cdc
commit
086db1b4c6
|
@ -22,8 +22,6 @@ import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.locks.Lock;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
|
@ -36,7 +34,6 @@ import org.apache.activemq.kaha.Store;
|
||||||
import org.apache.activemq.kaha.StoreEntry;
|
import org.apache.activemq.kaha.StoreEntry;
|
||||||
import org.apache.activemq.store.MessageRecoveryListener;
|
import org.apache.activemq.store.MessageRecoveryListener;
|
||||||
import org.apache.activemq.store.TopicReferenceStore;
|
import org.apache.activemq.store.TopicReferenceStore;
|
||||||
import org.apache.activemq.util.SubscriptionKey;
|
|
||||||
|
|
||||||
public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
|
public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
|
||||||
|
|
||||||
|
@ -143,10 +140,11 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
||||||
ConsumerMessageRef ref = null;
|
ConsumerMessageRef ref = null;
|
||||||
if((ref = container.remove(messageId)) != null) {
|
if((ref = container.remove(messageId)) != null) {
|
||||||
StoreEntry entry = ref.getAckEntry();
|
StoreEntry entry = ref.getAckEntry();
|
||||||
|
//ensure we get up to-date pointers
|
||||||
|
entry = ackContainer.refresh(entry);
|
||||||
TopicSubAck tsa = ackContainer.get(entry);
|
TopicSubAck tsa = ackContainer.get(entry);
|
||||||
if (tsa != null) {
|
if (tsa != null) {
|
||||||
if (tsa.decrementCount() <= 0) {
|
if (tsa.decrementCount() <= 0) {
|
||||||
entry = ackContainer.refresh(entry);
|
|
||||||
ackContainer.remove(entry);
|
ackContainer.remove(entry);
|
||||||
ReferenceRecord rr = messageContainer.get(messageId);
|
ReferenceRecord rr = messageContainer.get(messageId);
|
||||||
if (rr != null) {
|
if (rr != null) {
|
||||||
|
|
Loading…
Reference in New Issue