clear last batch id if no more messages left to dispatch to a durable subscriber

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@491089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-29 20:21:10 +00:00
parent 8b0f88a257
commit cd687e389c
4 changed files with 80 additions and 47 deletions

View File

@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
@ -29,10 +28,8 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.rapid.RapidMessageReference;
/**
* @version $Revision: 1.5 $
@ -70,7 +67,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
container.getListContainer().add(ref);
container.add(ref);
}
}
}
@ -80,7 +77,10 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
ConsumerMessageRef ref=container.remove();
if(container.isEmpty()){
container.reset();
}
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
@ -112,7 +112,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
}
//add the subscriber
// add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
if(retroactive){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
@ -135,7 +135,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Object msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
@ -158,14 +158,16 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
int count=0;
StoreEntry entry=container.getBatchEntry();
if(entry==null){
entry=container.getListContainer().getFirst();
entry=container.getEntry();
}else{
entry=container.getListContainer().refresh(entry);
entry=container.getListContainer().getNext(entry);
entry=container.refreshEntry(entry);
if(entry!=null){
entry=container.getNextEntry(entry);
}
}
if(entry!=null){
do{
ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
ConsumerMessageRef consumerRef=container.get(entry);
Object msg=messageContainer.get(consumerRef.getMessageEntry());
if(msg!=null){
if(msg.getClass()==String.class){
@ -178,7 +180,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
count++;
}
container.setBatchEntry(entry);
entry=container.getListContainer().getNext(entry);
entry=container.getNextEntry(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
}
@ -210,11 +212,11 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
subscriberMessages.put(key,tsc);
return container;
}
protected void removeSubscriberMessageContainer(Object key) throws IOException {
protected void removeSubscriberMessageContainer(Object key) throws IOException{
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
@ -234,7 +236,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
return container.getListContainer().size();
return container.size();
}
/**
@ -251,8 +253,6 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
messageContainer.add(messageRef);
}
/**
* @param identity
* @return String
@ -263,7 +263,6 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
return null;
}
/**
* @param context
* @throws IOException
@ -274,11 +273,10 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
ackContainer.clear();
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
container.getListContainer().clear();
container.clear();
}
}
public synchronized void resetBatching(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);

View File

@ -14,6 +14,7 @@
package org.apache.activemq.store.kahadaptor;
import java.util.Iterator;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.StoreEntry;
@ -44,22 +45,54 @@ import org.apache.activemq.kaha.StoreEntry;
this.batchEntry=batchEntry;
}
/**
* @return the listContainer
*/
public ListContainer getListContainer(){
return this.listContainer;
}
/**
* @param listContainer the listContainer to set
*/
public void setListContainer(ListContainer container){
this.listContainer=container;
}
public void reset() {
batchEntry = null;
}
public boolean isEmpty() {
return listContainer.isEmpty();
}
public void add(ConsumerMessageRef ref) {
listContainer.add(ref);
}
public ConsumerMessageRef remove() {
ConsumerMessageRef result = (ConsumerMessageRef)listContainer.removeFirst();
if (listContainer.isEmpty()) {
reset();
}
return result;
}
public ConsumerMessageRef get(StoreEntry entry) {
return (ConsumerMessageRef)listContainer.get(entry);
}
public StoreEntry getEntry() {
return listContainer.getFirst();
}
public StoreEntry refreshEntry(StoreEntry entry) {
return listContainer.refresh(entry);
}
public StoreEntry getNextEntry(StoreEntry entry) {
return listContainer.getNext(entry);
}
public Iterator iterator() {
return listContainer.iterator();
}
public int size() {
return listContainer.size();
}
public void clear() {
reset();
listContainer.clear();
}
}

View File

@ -38,6 +38,9 @@ class MemoryTopicSub{
void removeMessage(MessageId id){
map.remove(id);
if (map.isEmpty()) {
lastBatch=null;
}
}
int size(){

View File

@ -22,7 +22,6 @@ import org.apache.activeio.journal.active.Location;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
@ -78,7 +77,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
container.getListContainer().add(ref);
container.add(ref);
}
}
}
@ -88,7 +87,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
ConsumerMessageRef ref=(ConsumerMessageRef)container.remove();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
@ -142,7 +141,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref
.getMessageEntry());
@ -163,14 +162,14 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
int count=0;
StoreEntry entry=container.getBatchEntry();
if(entry==null){
entry=container.getListContainer().getFirst();
entry=container.getEntry();
}else{
entry=container.getListContainer().refresh(entry);
entry=container.getListContainer().getNext(entry);
entry=container.refreshEntry(entry);
entry=container.getNextEntry(entry);
}
if(entry!=null){
do{
ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
ConsumerMessageRef consumerRef=container.get(entry);
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef
.getMessageEntry());
if(messageReference!=null){
@ -179,7 +178,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
count++;
}
container.setBatchEntry(entry);
entry=container.getListContainer().getNext(entry);
entry=container.getNextEntry(entry);
}while(entry!=null&&count<maxReturned && listener.hasSpace());
}
}
@ -210,7 +209,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
protected void removeSubscriberMessageContainer(Object key) throws IOException {
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
@ -230,7 +229,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
return container.getListContainer().size();
return container.size();
}
/**
@ -271,7 +270,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
ackContainer.clear();
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
container.getListContainer().clear();
container.clear();
}
}
@ -294,7 +293,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
ConsumerMessageRef ref=(ConsumerMessageRef)container.remove();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){