Provide a more robust implementation, and one that deletes messages after they have been consumed

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-28 20:50:04 +00:00
parent 3093ea0f6e
commit b1c94aa00b
2 changed files with 130 additions and 88 deletions

View File

@ -18,7 +18,6 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
@ -28,6 +27,7 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.SubscriptionKey;
/**
@ -35,42 +35,45 @@ import org.apache.activemq.util.SubscriptionKey;
*/
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
private Map ackDatabase;
private Map subscriberDatabase;
private Map batchDatabase;
MessageId lastMessageId;
private Map topicSubMap;
public MemoryTopicMessageStore(ActiveMQDestination destination){
this(destination,new LinkedHashMap(),makeMap(),makeMap(),makeMap());
this(destination,new LRUCache(100,100,0.75f,false),makeMap());
}
protected static Map makeMap(){
return Collections.synchronizedMap(new HashMap());
}
public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase,
Map ackDatabase, Map batchDatabase){
public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase){
super(destination,messageTable);
this.subscriberDatabase=subscriberDatabase;
this.ackDatabase=ackDatabase;
this.batchDatabase=batchDatabase;
this.topicSubMap=makeMap();
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
super.addMessage(context,message);
lastMessageId=message.getMessageId();
for(Iterator i=topicSubMap.values().iterator();i.hasNext();){
MemoryTopicSub sub=(MemoryTopicSub)i.next();
sub.addMessage(message.getMessageId(),message);
}
}
public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
throws IOException{
ackDatabase.put(new SubscriptionKey(clientId,subscriptionName),messageId);
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(key);
if(sub!=null){
sub.removeMessage(messageId);
}
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
}
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
@ -78,112 +81,62 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
subscriberDatabase.put(key,info);
MessageId l=retroactive?null:lastMessageId;
if(l!=null){
ackDatabase.put(key,l);
MemoryTopicSub sub=new MemoryTopicSub();
topicSubMap.put(key,sub);
if(retroactive){
for(Iterator i=messageTable.entrySet().iterator();i.hasNext();){
Map.Entry entry=(Entry)i.next();
sub.addMessage((MessageId)entry.getKey(),(Message)entry.getValue());
}
}
subscriberDatabase.put(key,info);
}
public void deleteSubscription(String clientId,String subscriptionName){
org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
ackDatabase.remove(key);
subscriberDatabase.remove(key);
topicSubMap.remove(key);
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
boolean pastLastAck=lastAck==null;
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry)iter.next();
if(pastLastAck){
Object msg=entry.getValue();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message)msg);
}
}else{
pastLastAck=entry.getKey().equals(lastAck);
MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
if(sub!=null){
sub.recoverSubscription(listener);
}
}
listener.finished();
}
}
public void delete(){
super.delete();
ackDatabase.clear();
subscriberDatabase.clear();
lastMessageId=null;
topicSubMap.clear();
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
public synchronized int getMessageCount(String clientId,String subscriberName) throws IOException{
int result=0;
MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
result=messageTable.size();
if(lastAck!=null){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry)iter.next();
if(entry.getKey().equals(lastAck)){
break;
}
result--;
}
}
MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriberName));
if(sub!=null){
result=sub.size();
}
return result;
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
SubscriptionKey key = new SubscriptionKey(clientId,subscriptionName);
MessageId lastBatch = (MessageId)batchDatabase.get(key);
if (lastBatch==null) {
//if last batch null - start from last ack
lastBatch = (MessageId)ackDatabase.get(key);
}
boolean pastLackBatch=lastBatch==null;
MessageId lastId = null;
// the message table is a synchronizedMap - so just have to synchronize here
int count = 0;
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() &&count < maxReturned ;){
Map.Entry entry=(Entry)iter.next();
if(pastLackBatch){
count++;
Object msg=entry.getValue();
lastId = (MessageId)entry.getKey();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message)msg);
}
}else{
pastLackBatch=entry.getKey().equals(lastBatch);
}
}
if (lastId != null) {
batchDatabase.put(key,lastId);
}
listener.finished();
MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
if(sub!=null){
sub.recoverNextMessages(maxReturned,listener);
}
}
public void resetBatching(String clientId,String subscriptionName){
batchDatabase.remove(new SubscriptionKey(clientId,subscriptionName));
MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
if(sub!=null){
sub.resetBatching();
}
}
}

View File

@ -0,0 +1,89 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
/**
* A holder for a durable subscriber
*
* @version $Revision: 1.7 $
*/
class MemoryTopicSub{
private Map map=new LinkedHashMap();
private MessageId lastBatch;
void addMessage(MessageId id,Message message){
map.put(id,message);
}
void removeMessage(MessageId id){
map.remove(id);
}
int size(){
return map.size();
}
void recoverSubscription(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=map.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry)iter.next();
Object msg=entry.getValue();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
boolean pastLackBatch=lastBatch==null;
MessageId lastId=null;
// the message table is a synchronizedMap - so just have to synchronize here
int count=0;
for(Iterator iter=map.entrySet().iterator();iter.hasNext()&&count<maxReturned;){
Map.Entry entry=(Entry)iter.next();
if(pastLackBatch){
count++;
Object msg=entry.getValue();
lastId=(MessageId)entry.getKey();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message)msg);
}
}else{
pastLackBatch=entry.getKey().equals(lastBatch);
}
}
if(lastId!=null){
lastBatch=lastId;
}
listener.finished();
}
void resetBatching(){
lastBatch=null;
}
}