diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java index 0d873f9036..bdac87420b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java @@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.kaha.ListContainer; @@ -29,10 +28,8 @@ import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; -import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.rapid.RapidMessageReference; /** * @version $Revision: 1.5 $ @@ -70,7 +67,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess ConsumerMessageRef ref=new ConsumerMessageRef(); ref.setAckEntry(ackEntry); ref.setMessageEntry(messageEntry); - container.getListContainer().add(ref); + container.add(ref); } } } @@ -80,7 +77,10 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess String subcriberId=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); if(container!=null){ - ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst(); + ConsumerMessageRef ref=container.remove(); + if(container.isEmpty()){ + container.reset(); + } if(ref!=null){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){ @@ -112,7 +112,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess if(!subscriberContainer.containsKey(key)){ subscriberContainer.put(key,info); } - //add the subscriber + // add the subscriber ListContainer container=addSubscriberMessageContainer(key); if(retroactive){ for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ @@ -135,7 +135,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess String key=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); if(container!=null){ - for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + for(Iterator i=container.iterator();i.hasNext();){ ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); Object msg=messageContainer.get(ref.getMessageEntry()); if(msg!=null){ @@ -158,14 +158,16 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess int count=0; StoreEntry entry=container.getBatchEntry(); if(entry==null){ - entry=container.getListContainer().getFirst(); + entry=container.getEntry(); }else{ - entry=container.getListContainer().refresh(entry); - entry=container.getListContainer().getNext(entry); + entry=container.refreshEntry(entry); + if(entry!=null){ + entry=container.getNextEntry(entry); + } } if(entry!=null){ do{ - ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry); + ConsumerMessageRef consumerRef=container.get(entry); Object msg=messageContainer.get(consumerRef.getMessageEntry()); if(msg!=null){ if(msg.getClass()==String.class){ @@ -178,7 +180,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess count++; } container.setBatchEntry(entry); - entry=container.getListContainer().getNext(entry); + entry=container.getNextEntry(entry); }while(entry!=null&&count