From a11060452b5383bae75cc35af777208a11637098 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 27 Nov 2006 13:40:11 +0000 Subject: [PATCH] support for durable store cursors and retroactive subscribers git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@479614 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/AbstractSubscription.java | 25 +++++++ .../region/DurableTopicSubscription.java | 6 ++ .../broker/region/PrefetchSubscription.java | 2 +- .../activemq/broker/region/Subscription.java | 2 +- .../broker/region/SubscriptionRecovery.java | 49 +++++++++++++ .../cursors/AbstractPendingMessageCursor.java | 4 ++ .../cursors/FilePendingMessageCursor.java | 33 ++++----- .../region/cursors/PendingMessageCursor.java | 7 ++ .../cursors/StoreDurableSubscriberCursor.java | 4 ++ .../FixedCountSubscriptionRecoveryPolicy.java | 14 +--- .../FixedSizedSubscriptionRecoveryPolicy.java | 32 +++------ .../LastImageSubscriptionRecoveryPolicy.java | 21 ++---- .../policy/NoSubscriptionRecoveryPolicy.java | 3 +- .../QueryBasedSubscriptionRecoveryPolicy.java | 35 ++++------ .../policy/SubscriptionRecoveryPolicy.java | 4 +- .../TimedSubscriptionRecoveryPolicy.java | 29 +++----- .../list/DestinationBasedMessageList.java | 2 +- .../activemq/memory/list/MessageList.java | 6 +- .../memory/list/SimpleMessageList.java | 17 ++--- .../kahadaptor/KahaTopicMessageStore.java | 14 +++- .../store/memory/MemoryMessageStore.java | 28 +++++++- .../store/memory/MemoryTopicMessageStore.java | 69 ++++++++++++------- .../store/rapid/RapidTopicMessageStore.java | 14 +++- 23 files changed, 264 insertions(+), 156 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 076d26e06b..6f30b45788 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -159,4 +159,29 @@ abstract public class AbstractSubscription implements Subscription { public boolean isRecoveryRequired(){ return true; } + + public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception{ + boolean result = false; + MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); + try { + msgContext.setDestination(message.getRegionDestination().getActiveMQDestination()); + msgContext.setMessageReference(message); + result = matches(message,msgContext); + if (result) { + doAddRecoveredMessage(message); + } + + }finally { + msgContext.clear(); + } + return result; + } + + public ActiveMQDestination getActiveMQDestination() { + return info != null ? info.getDestination() : null; + } + + protected void doAddRecoveredMessage(MessageReference message) throws Exception { + add(message); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index cb6dd0af21..51e2bee17b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -152,6 +152,10 @@ public class DurableTopicSubscription extends PrefetchSubscription { super.add(node); } + protected void doAddRecoveredMessage(MessageReference message) throws Exception { + pending.addRecoveredMessage(message); + } + public int getPendingQueueSize() { if( active || keepDurableSubsActive ) { return super.getPendingQueueSize(); @@ -218,5 +222,7 @@ public class DurableTopicSubscription extends PrefetchSubscription { } dispatched.clear(); } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 6caab4906a..a445059e53 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -390,7 +390,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ // Message may have been sitting in the pending list a while // waiting for the consumer to ak the message. - if( node.isExpired() ) { + if( node != QueueMessageReference.NULL_MESSAGE && node.isExpired() ) { continue; // just drop it. } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index dd06805478..abae931b3d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -34,7 +34,7 @@ import javax.management.ObjectName; /** * @version $Revision: 1.5 $ */ -public interface Subscription { +public interface Subscription extends SubscriptionRecovery { /** * Used to add messages that match the subscription. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java new file mode 100644 index 0000000000..efe2a9b144 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java @@ -0,0 +1,49 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * An interface for recoverying transient messages held by the broker + * for retractive recovery for subscribers + * + * @version $Revision$ + */ +public interface SubscriptionRecovery { + + + /** + * Add a message to the SubscriptionRecovery + * @param context + * @param message + * @return true if the message is accepted + * @throws Exception + */ + boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception; + + + /** + * @return the Destination associated with this Subscription + */ + ActiveMQDestination getActiveMQDestination(); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 70e00c1550..aa1b66caf4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -51,6 +51,10 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{ public void addMessageLast(MessageReference node) throws Exception{ } + + public void addRecoveredMessage(MessageReference node) throws Exception{ + addMessageLast(node); + } public void clear(){ } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 80615c2df2..01387f869e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -17,8 +17,7 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; @@ -45,7 +44,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple private ListContainer diskList; private Iterator iter=null; private Destination regionDestination; - private ReentrantLock iterLock=new ReentrantLock(); + private AtomicBoolean iterating=new AtomicBoolean(); + private boolean flushRequired; /** * @param name @@ -67,17 +67,19 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple * reset the cursor * */ - public void reset(){ - try{ - iterLock.lockInterruptibly(); - iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator(); - }catch(InterruptedException e){ - log.warn("Failed to get lock ",e); + public synchronized void reset(){ + synchronized(iterating){ + iterating.set(true); } + iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator(); } - public void release(){ - iterLock.unlock(); + public synchronized void release(){ + iterating.set(false); + if(flushRequired){ + flushRequired=false; + flushToDisk(); + } } public synchronized void destroy(){ @@ -219,13 +221,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ if(newPercentUsage>=100){ - try{ - if(iterLock.tryLock(500,TimeUnit.MILLISECONDS)){ + synchronized(iterating){ + flushRequired=true; + if(!iterating.get()){ flushToDisk(); - iterLock.unlock(); + flushRequired=false; } - }catch(InterruptedException e){ - log.warn("caught an exception aquiring lock",e); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index c7a9c668d5..fb7a29fc9a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -76,6 +76,13 @@ public interface PendingMessageCursor extends Service{ * @throws Exception */ public void addMessageFirst(MessageReference node) throws Exception; + + /** + * Add a message recovered from a retroactive policy + * @param node + * @throws Exception + */ + public void addRecoveredMessage(MessageReference node) throws Exception; /** * @return true if there pending messages to dispatch diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 759c9b1330..925e057644 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -147,6 +147,10 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ } } } + + public void addRecoveredMessage(MessageReference node) throws Exception{ + nonPersistent.addMessageLast(node); + } public void clear(){ pendingCount=0; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java index 5058c9acb8..e8594e37bf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -53,7 +54,7 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover return true; } - synchronized public void recover(ConnectionContext context,Topic topic,Subscription sub) throws Exception{ + synchronized public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{ // Re-dispatch the last message seen. int t=tail; // The buffer may not have rolled over yet..., start from the front @@ -63,18 +64,9 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover if(messages[t]==null) return; // Keep dispatching until t hit's tail again. - MessageEvaluationContext msgContext=context.getMessageEvaluationContext(); do{ MessageReference node=messages[t]; - try{ - msgContext.setDestination(node.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(node); - if(sub.matches(node,msgContext)){ - sub.add(node); - } - }finally{ - msgContext.clear(); - } + sub.addRecoveredMessage(context,node); t++; if(t>=messages.length) t=0; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java index 27791777fe..fd29ffa36c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java @@ -17,23 +17,18 @@ */ package org.apache.activemq.broker.region.policy; +import java.util.Iterator; +import java.util.List; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; -import org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy.TimestampWrapper; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; -import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.memory.list.DestinationBasedMessageList; import org.apache.activemq.memory.list.MessageList; import org.apache.activemq.memory.list.SimpleMessageList; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - /** * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed * amount of memory available in RAM for message history which is evicted in @@ -61,22 +56,13 @@ public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecover return true; } - public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception { + public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{ // Re-dispatch the messages from the buffer. - List copy = buffer.getMessages(sub); - if( !copy.isEmpty() ) { - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - try { - for (Iterator iter = copy.iterator(); iter.hasNext();) { - MessageReference node = (MessageReference) iter.next(); - msgContext.setDestination(node.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(node); - if (sub.matches(node, msgContext) ) { - sub.add(node); - } - } - } finally { - msgContext.clear(); + List copy=buffer.getMessages(sub.getActiveMQDestination()); + if(!copy.isEmpty()){ + for(Iterator iter=copy.iterator();iter.hasNext();){ + MessageReference node=(MessageReference)iter.next(); + sub.addRecoveredMessage(context,node); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java index 80febba977..bd51762fff 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java @@ -19,15 +19,13 @@ package org.apache.activemq.broker.region.policy; import java.util.ArrayList; import java.util.List; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.filter.MessageEvaluationContext; /** * This implementation of {@link SubscriptionRecoveryPolicy} will only keep @@ -46,20 +44,11 @@ public class LastImageSubscriptionRecoveryPolicy implements SubscriptionRecovery return true; } - public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception { + public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{ // Re-dispatch the last message seen. - MessageReference node = lastImage; - if( node != null ){ - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - try { - msgContext.setDestination(node.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(node); - if (sub.matches(node, msgContext)) { - sub.add(node); - } - } finally { - msgContext.clear(); - } + MessageReference node=lastImage; + if(node!=null){ + sub.addRecoveredMessage(context,node); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java index 1ce95bb75e..2725247edf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java @@ -20,6 +20,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -43,7 +44,7 @@ public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy return true; } - public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception { + public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception { } public void start() throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java index ceb6a119f3..98b177d687 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java @@ -24,6 +24,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -66,20 +67,16 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover return query.validateUpdate(message.getMessage()); } - public void recover(ConnectionContext context, final Topic topic, final Subscription sub) throws Exception { - if (query != null) { - final MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - try { - ActiveMQDestination destination = sub.getConsumerInfo().getDestination(); - query.execute(destination, new MessageListener() { - public void onMessage(Message message) { - dispatchInitialMessage(message, topic, msgContext, sub); - } - }); - } - finally { - msgContext.clear(); - } + public void recover(final ConnectionContext context,final Topic topic,final SubscriptionRecovery sub) + throws Exception{ + if(query!=null){ + ActiveMQDestination destination=sub.getActiveMQDestination(); + query.execute(destination,new MessageListener(){ + + public void onMessage(Message message){ + dispatchInitialMessage(message,topic,context,sub); + } + }); } } @@ -107,21 +104,17 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover return new org.apache.activemq.command.Message[0]; } - protected void dispatchInitialMessage(Message message, Destination regionDestination, MessageEvaluationContext msgContext, Subscription sub) { + protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) { try { ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null); ActiveMQDestination destination = activeMessage.getDestination(); if (destination == null) { - destination = sub.getConsumerInfo().getDestination(); + destination = sub.getActiveMQDestination(); activeMessage.setDestination(destination); } activeMessage.setRegionDestination(regionDestination); configure(activeMessage); - msgContext.setDestination(destination); - msgContext.setMessageReference(activeMessage); - if (sub.matches(activeMessage, msgContext)) { - sub.add(activeMessage); - } + sub.addRecoveredMessage(context,activeMessage); } catch (Throwable e) { log.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java index 8ac640150b..4917a6fb24 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java @@ -22,6 +22,7 @@ import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -55,7 +56,7 @@ public interface SubscriptionRecoveryPolicy extends Service { * @param node * @throws Exception */ - void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception; + void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception; /** @@ -67,6 +68,7 @@ public interface SubscriptionRecoveryPolicy extends Service { /** * Used to copy the policy object. + * @return the copy */ SubscriptionRecoveryPolicy copy(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java index 946b0d01cd..e9183be9ba 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java @@ -22,10 +22,9 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -80,25 +79,15 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli return true; } - public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception { - + public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{ // Re-dispatch the messages from the buffer. - ArrayList copy = new ArrayList(buffer); - - if (!copy.isEmpty()) { - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - try { - for (Iterator iter = copy.iterator(); iter.hasNext();) { - TimestampWrapper timestampWrapper = (TimestampWrapper) iter.next(); - MessageReference message = timestampWrapper.message; - msgContext.setDestination(message.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(message); - if (sub.matches(message, msgContext)) { - sub.add(timestampWrapper.message); - } - } - }finally { - msgContext.clear(); + ArrayList copy=new ArrayList(buffer); + if(!copy.isEmpty()){ + MessageEvaluationContext msgContext=context.getMessageEvaluationContext(); + for(Iterator iter=copy.iterator();iter.hasNext();){ + TimestampWrapper timestampWrapper=(TimestampWrapper)iter.next(); + MessageReference message=timestampWrapper.message; + sub.addRecoveredMessage(context,message); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java b/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java index 73088252ab..9537abf0ae 100644 --- a/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java @@ -74,7 +74,7 @@ public class DestinationBasedMessageList implements MessageList { return getMessages(sub.getConsumerInfo().getDestination()); } - protected List getMessages(ActiveMQDestination destination) { + public List getMessages(ActiveMQDestination destination) { Set set = null; synchronized (lock) { set = subscriptionIndex.get(destination); diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java b/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java index 3f1b9146eb..f90e71ead9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java @@ -17,13 +17,11 @@ */ package org.apache.activemq.memory.list; +import java.util.List; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; -import java.util.List; - /** * A container of messages which is used to store messages and then * replay them later for a given subscription. @@ -37,7 +35,7 @@ public interface MessageList { /** * Returns the current list of MessageReference objects for the given subscription */ - List getMessages(Subscription sub); + List getMessages(ActiveMQDestination destination); /** * @param destination diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java b/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java index 71af7203ae..82b6c6efd7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java @@ -17,20 +17,17 @@ */ package org.apache.activemq.memory.list; -import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.network.DemandForwardingBridge; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.DestinationFilter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A simple fixed size {@link MessageList} where there is a single, fixed size @@ -66,7 +63,7 @@ public class SimpleMessageList implements MessageList { } } - public List getMessages(Subscription sub) { + public List getMessages(ActiveMQDestination destination) { return getList(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java index c893670c02..ac3c36f7e4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java @@ -112,7 +112,16 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess if(!subscriberContainer.containsKey(key)){ subscriberContainer.put(key,info); } - addSubscriberMessageContainer(key); + ListContainer container=addSubscriberMessageContainer(key); + if(retroactive){ + for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); + ConsumerMessageRef ref=new ConsumerMessageRef(); + ref.setAckEntry(entry); + ref.setMessageEntry(tsa.getMessageEntry()); + container.add(ref); + } + } } public synchronized void deleteSubscription(String clientId,String subscriptionName){ @@ -207,12 +216,13 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess return result; } - protected void addSubscriberMessageContainer(Object key) throws IOException{ + protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{ ListContainer container=store.getListContainer(key,"topic-subs"); Marshaller marshaller=new ConsumerMessageRefMarshaller(); container.setMarshaller(marshaller); TopicSubContainer tsc=new TopicSubContainer(container); subscriberMessages.put(key,tsc); + return container; } public int getMessageCount(String clientId,String subscriberName) throws IOException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index cc6ff29e80..37798e5463 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -37,6 +38,7 @@ public class MemoryMessageStore implements MessageStore{ protected final ActiveMQDestination destination; protected final Map messageTable; + protected MessageId lastBatchId; public MemoryMessageStore(ActiveMQDestination destination){ this(destination,new LinkedHashMap()); @@ -115,12 +117,32 @@ public class MemoryMessageStore implements MessageStore{ return messageTable.size(); } - public void resetBatching(MessageId nextToDispatch){ - } - + public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + synchronized(messageTable){ + + boolean pastLackBatch=lastBatchId==null; + int count = 0; + for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ + Map.Entry entry=(Entry)iter.next(); + if(pastLackBatch){ + count++; + Object msg=entry.getValue(); + lastBatchId = (MessageId)entry.getKey(); + if(msg.getClass()==String.class){ + listener.recoverMessageReference((String)msg); + }else{ + listener.recoverMessage((Message)msg); + } + }else{ + pastLackBatch=entry.getKey().equals(lastBatchId); + } + } + listener.finished(); + } } public void resetBatching(){ + lastBatchId = null; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java index d2982fcb4d..e6f53f6945 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java @@ -37,10 +37,11 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic private Map ackDatabase; private Map subscriberDatabase; + private Map batchDatabase; MessageId lastMessageId; public MemoryTopicMessageStore(ActiveMQDestination destination){ - this(destination,new LinkedHashMap(),makeMap(),makeMap()); + this(destination,new LinkedHashMap(),makeMap(),makeMap(),makeMap()); } protected static Map makeMap(){ @@ -48,10 +49,11 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic } public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase, - Map ackDatabase){ + Map ackDatabase, Map batchDatabase){ super(destination,messageTable); this.subscriberDatabase=subscriberDatabase; this.ackDatabase=ackDatabase; + this.batchDatabase=batchDatabase; } public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ @@ -110,13 +112,10 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic } listener.finished(); } + } - public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned, - MessageRecoveryListener listener) throws Exception{ - listener.finished(); - } - + public void delete(){ super.delete(); ackDatabase.clear(); @@ -128,14 +127,6 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); } - public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{ - return null; - } - - public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) - throws IOException{ - return null; - } public int getMessageCount(String clientId,String subscriberName) throws IOException{ int result=0; @@ -143,24 +134,56 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic // the message table is a synchronizedMap - so just have to synchronize here synchronized(messageTable){ result=messageTable.size(); - for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ - Map.Entry entry=(Entry)iter.next(); - if(entry.getKey().equals(lastAck)){ - break; + if(lastAck!=null){ + for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ + Map.Entry entry=(Entry)iter.next(); + if(entry.getKey().equals(lastAck)){ + break; + } + result--; } - result--; } } return result; } - public void resetBatching(String clientId,String subscriptionName,MessageId id){ - } - + public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, MessageRecoveryListener listener) throws Exception{ + SubscriptionKey key = new SubscriptionKey(clientId,subscriptionName); + MessageId lastBatch = (MessageId)batchDatabase.get(key); + if (lastBatch==null) { + //if last batch null - start from last ack + lastBatch = (MessageId)ackDatabase.get(key); + } + boolean pastLackBatch=lastBatch==null; + MessageId lastId = null; + // the message table is a synchronizedMap - so just have to synchronize here + int count = 0; + synchronized(messageTable){ + for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() &&count < maxReturned ;){ + Map.Entry entry=(Entry)iter.next(); + if(pastLackBatch){ + count++; + Object msg=entry.getValue(); + lastId = (MessageId)entry.getKey(); + if(msg.getClass()==String.class){ + listener.recoverMessageReference((String)msg); + }else{ + listener.recoverMessage((Message)msg); + } + }else{ + pastLackBatch=entry.getKey().equals(lastBatch); + } + } + if (lastId != null) { + batchDatabase.put(key,lastId); + } + listener.finished(); + } } public void resetBatching(String clientId,String subscriptionName){ + batchDatabase.remove(new SubscriptionKey(clientId,subscriptionName)); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java index 333bad15be..ca08545529 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java @@ -120,7 +120,16 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe if(!subscriberContainer.containsKey(key)){ subscriberContainer.put(key,info); } - addSubscriberMessageContainer(key); + ListContainer container=addSubscriberMessageContainer(key); + if(retroactive){ + for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); + ConsumerMessageRef ref=new ConsumerMessageRef(); + ref.setAckEntry(entry); + ref.setMessageEntry(tsa.getMessageEntry()); + container.add(ref); + } + } } public synchronized void deleteSubscription(String clientId,String subscriptionName){ @@ -204,12 +213,13 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe return result; } - protected void addSubscriberMessageContainer(Object key) throws IOException{ + protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{ ListContainer container=store.getListContainer(key,"topic-subs"); Marshaller marshaller=new ConsumerMessageRefMarshaller(); container.setMarshaller(marshaller); TopicSubContainer tsc=new TopicSubContainer(container); subscriberMessages.put(key,tsc); + return container; } public int getMessageCount(String clientId,String subscriberName) throws IOException{