diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java index e6f53f6945..d90cf0077d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.activemq.broker.ConnectionContext; @@ -28,6 +27,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.SubscriptionKey; /** @@ -35,42 +35,45 @@ import org.apache.activemq.util.SubscriptionKey; */ public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{ - private Map ackDatabase; private Map subscriberDatabase; - private Map batchDatabase; - MessageId lastMessageId; + private Map topicSubMap; public MemoryTopicMessageStore(ActiveMQDestination destination){ - this(destination,new LinkedHashMap(),makeMap(),makeMap(),makeMap()); + this(destination,new LRUCache(100,100,0.75f,false),makeMap()); } protected static Map makeMap(){ return Collections.synchronizedMap(new HashMap()); } - public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase, - Map ackDatabase, Map batchDatabase){ + public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase){ super(destination,messageTable); this.subscriberDatabase=subscriberDatabase; - this.ackDatabase=ackDatabase; - this.batchDatabase=batchDatabase; + this.topicSubMap=makeMap(); } public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ super.addMessage(context,message); - lastMessageId=message.getMessageId(); + for(Iterator i=topicSubMap.values().iterator();i.hasNext();){ + MemoryTopicSub sub=(MemoryTopicSub)i.next(); + sub.addMessage(message.getMessageId(),message); + } } - public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId) - throws IOException{ - ackDatabase.put(new SubscriptionKey(clientId,subscriptionName),messageId); + public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName, + MessageId messageId) throws IOException{ + SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); + MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(key); + if(sub!=null){ + sub.removeMessage(messageId); + } } public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{ return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName)); } - public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) + public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) throws IOException{ SubscriptionInfo info=new SubscriptionInfo(); info.setDestination(destination); @@ -78,112 +81,62 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic info.setSelector(selector); info.setSubcriptionName(subscriptionName); SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); - subscriberDatabase.put(key,info); - MessageId l=retroactive?null:lastMessageId; - if(l!=null){ - ackDatabase.put(key,l); + MemoryTopicSub sub=new MemoryTopicSub(); + topicSubMap.put(key,sub); + if(retroactive){ + for(Iterator i=messageTable.entrySet().iterator();i.hasNext();){ + Map.Entry entry=(Entry)i.next(); + sub.addMessage((MessageId)entry.getKey(),(Message)entry.getValue()); + } } + subscriberDatabase.put(key,info); } public void deleteSubscription(String clientId,String subscriptionName){ org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); - ackDatabase.remove(key); subscriberDatabase.remove(key); + topicSubMap.remove(key); } public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) throws Exception{ - MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriptionName)); - boolean pastLastAck=lastAck==null; - // the message table is a synchronizedMap - so just have to synchronize here - synchronized(messageTable){ - for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ - Map.Entry entry=(Entry)iter.next(); - if(pastLastAck){ - Object msg=entry.getValue(); - if(msg.getClass()==String.class){ - listener.recoverMessageReference((String)msg); - }else{ - listener.recoverMessage((Message)msg); - } - }else{ - pastLastAck=entry.getKey().equals(lastAck); - } - } - listener.finished(); + MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName)); + if(sub!=null){ + sub.recoverSubscription(listener); } - } - public void delete(){ super.delete(); - ackDatabase.clear(); subscriberDatabase.clear(); - lastMessageId=null; + topicSubMap.clear(); } public SubscriptionInfo[] getAllSubscriptions() throws IOException{ return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); } - - public int getMessageCount(String clientId,String subscriberName) throws IOException{ + public synchronized int getMessageCount(String clientId,String subscriberName) throws IOException{ int result=0; - MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriberName)); - // the message table is a synchronizedMap - so just have to synchronize here - synchronized(messageTable){ - result=messageTable.size(); - if(lastAck!=null){ - for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ - Map.Entry entry=(Entry)iter.next(); - if(entry.getKey().equals(lastAck)){ - break; - } - result--; - } - } + MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriberName)); + if(sub!=null){ + result=sub.size(); } return result; } - public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, MessageRecoveryListener listener) throws Exception{ - SubscriptionKey key = new SubscriptionKey(clientId,subscriptionName); - MessageId lastBatch = (MessageId)batchDatabase.get(key); - if (lastBatch==null) { - //if last batch null - start from last ack - lastBatch = (MessageId)ackDatabase.get(key); - } - boolean pastLackBatch=lastBatch==null; - MessageId lastId = null; - // the message table is a synchronizedMap - so just have to synchronize here - int count = 0; - synchronized(messageTable){ - for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() &&count < maxReturned ;){ - Map.Entry entry=(Entry)iter.next(); - if(pastLackBatch){ - count++; - Object msg=entry.getValue(); - lastId = (MessageId)entry.getKey(); - if(msg.getClass()==String.class){ - listener.recoverMessageReference((String)msg); - }else{ - listener.recoverMessage((Message)msg); - } - }else{ - pastLackBatch=entry.getKey().equals(lastBatch); - } - } - if (lastId != null) { - batchDatabase.put(key,lastId); - } - listener.finished(); + MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName)); + if(sub!=null){ + sub.recoverNextMessages(maxReturned,listener); } } public void resetBatching(String clientId,String subscriptionName){ - batchDatabase.remove(new SubscriptionKey(clientId,subscriptionName)); + MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName)); + if(sub!=null){ + sub.resetBatching(); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java new file mode 100755 index 0000000000..079c30076a --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java @@ -0,0 +1,89 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.store.memory; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageRecoveryListener; + +/** + * A holder for a durable subscriber + * + * @version $Revision: 1.7 $ + */ +class MemoryTopicSub{ + + private Map map=new LinkedHashMap(); + private MessageId lastBatch; + + void addMessage(MessageId id,Message message){ + map.put(id,message); + } + + void removeMessage(MessageId id){ + map.remove(id); + } + + int size(){ + return map.size(); + } + + void recoverSubscription(MessageRecoveryListener listener) throws Exception{ + for(Iterator iter=map.entrySet().iterator();iter.hasNext();){ + Map.Entry entry=(Entry)iter.next(); + Object msg=entry.getValue(); + if(msg.getClass()==String.class){ + listener.recoverMessageReference((String)msg); + }else{ + listener.recoverMessage((Message)msg); + } + } + listener.finished(); + } + + void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + boolean pastLackBatch=lastBatch==null; + MessageId lastId=null; + // the message table is a synchronizedMap - so just have to synchronize here + int count=0; + for(Iterator iter=map.entrySet().iterator();iter.hasNext()&&count