From 2748ae1612190838a74e653e2ecbf75e5e506446 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 28 Feb 2006 12:55:21 +0000 Subject: [PATCH] Added support for browsing Topics git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@381644 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/jmx/QueueView.java | 112 +++------------- .../apache/activemq/broker/jmx/TopicView.java | 58 ++------- .../activemq/broker/jmx/TopicViewMBean.java | 7 + .../activemq/broker/region/Destination.java | 2 + .../apache/activemq/broker/region/Queue.java | 3 + .../apache/activemq/broker/region/Topic.java | 35 ++++- .../FixedCountSubscriptionRecoveryPolicy.java | 120 ++++++++++-------- .../FixedSizedSubscriptionRecoveryPolicy.java | 9 ++ .../LastImageSubscriptionRecoveryPolicy.java | 14 ++ .../policy/NoSubscriptionRecoveryPolicy.java | 6 + .../QueryBasedSubscriptionRecoveryPolicy.java | 4 + .../policy/SubscriptionRecoveryPolicy.java | 18 ++- .../TimedSubscriptionRecoveryPolicy.java | 31 ++++- .../list/DestinationBasedMessageList.java | 14 +- .../activemq/memory/list/MessageList.java | 10 ++ .../memory/list/SimpleMessageList.java | 31 ++++- .../store/MessageRecoveryListener.java | 1 + .../jdbc/JDBCMessageRecoveryListener.java | 1 + .../activemq/store/jdbc/JDBCMessageStore.java | 3 + .../store/jdbc/JDBCTopicMessageStore.java | 4 + .../jdbc/adapter/DefaultJDBCAdapter.java | 6 +- .../journal/QuickJournalMessageStore.java | 4 + .../QuickJournalTopicMessageStore.java | 4 + .../store/memory/MemoryMessageStore.java | 1 + .../store/memory/MemoryTopicMessageStore.java | 1 + 25 files changed, 291 insertions(+), 208 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java index e7091f08a6..23cc4ffa69 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java @@ -1,111 +1,39 @@ /** - * + * * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.broker.jmx; import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - -import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.Message; - -public class QueueView implements QueueViewMBean { - - private final Queue destination; - - public QueueView(Queue destination) { - this.destination = destination; +public class QueueView extends DestinationView implements QueueViewMBean{ + public QueueView(Queue destination){ + super(destination); } - public void gc() { - destination.gc(); - } - public void resetStatistics() { - destination.getDestinationStatistics().reset(); - } - - public long getEnqueueCount() { - return destination.getDestinationStatistics().getEnqueues().getCount(); - - } - public long getDequeueCount() { - return destination.getDestinationStatistics().getDequeues().getCount(); - } - - public long getConsumerCount() { - return destination.getDestinationStatistics().getConsumers().getCount(); - } - - public long getMessages() { - return destination.getDestinationStatistics().getMessages().getCount(); - } - - public long getMessagesCached() { - return destination.getDestinationStatistics().getMessagesCached().getCount(); - } - - public CompositeData[] browse() throws OpenDataException { - Message[] messages = destination.browse(); - CompositeData c[] = new CompositeData[messages.length]; - for (int i = 0; i < c.length; i++) { - try { - c[i] = OpenTypeSupport.convert(messages[i]); - } catch (Throwable e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - return c; - } - - public TabularData browseAsTable() throws OpenDataException { - OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); - - Message[] messages = destination.browse(); - CompositeType ct = factory.getCompositeType(); - TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[]{"JMSMessageID"}); - TabularDataSupport rc = new TabularDataSupport(tt); - for (int i = 0; i < messages.length; i++) { - rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); - } - - return rc; - } - - - public CompositeData getMessage(String messageId) throws OpenDataException { - Message rc = destination.getMessage(messageId); - if( rc ==null ) + public CompositeData getMessage(String messageId) throws OpenDataException{ + Message rc=((Queue) destination).getMessage(messageId); + if(rc==null) return null; return OpenTypeSupport.convert(rc); } - - public void removeMessage(String messageId) { - destination.removeMessage(messageId); + + public void removeMessage(String messageId){ + ((Queue) destination).removeMessage(messageId); } - public void purge() { - destination.purge(); + public void purge(){ + ((Queue) destination).purge(); } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java index 4aa5eca45d..619d2c7c02 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java @@ -1,56 +1,22 @@ /** - * + * * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.broker.jmx; import org.apache.activemq.broker.region.Topic; - -public class TopicView implements TopicViewMBean { - - private final Topic destination; - - public TopicView(Topic destination) { - this.destination = destination; - } - - public void gc() { - destination.gc(); - } - public void resetStatistics() { - destination.getDestinationStatistics().reset(); - } - - public long getEnqueueCount() { - return destination.getDestinationStatistics().getEnqueues().getCount(); +public class TopicView extends DestinationView implements TopicViewMBean{ + public TopicView(Topic destination){ + super(destination); } - public long getDequeueCount() { - return destination.getDestinationStatistics().getDequeues().getCount(); - } - - public long getConsumerCount() { - return destination.getDestinationStatistics().getConsumers().getCount(); - } - - public long getMessages() { - return destination.getDestinationStatistics().getMessages().getCount(); - } - - public long getMessagesCached() { - return destination.getDestinationStatistics().getMessagesCached().getCount(); - } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java index 8cdeabb935..ce7ff33298 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.broker.jmx; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; + public interface TopicViewMBean { @@ -27,5 +31,8 @@ public interface TopicViewMBean { public long getConsumerCount(); public long getMessages(); public long getMessagesCached(); + + public CompositeData[] browse() throws OpenDataException; + public TabularData browseAsTable() throws OpenDataException; } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index 8032099302..d1b5e3095f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -52,4 +52,6 @@ public interface Destination extends Service { DestinationStatistics getDestinationStatistics(); MessageStore getMessageStore(); DeadLetterStrategy getDeadLetterStrategy(); + + public Message[] browse(); } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 01ed2332ef..cae8332e0f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -98,6 +98,9 @@ public class Queue implements Destination { public void recoverMessageReference(String messageReference) throws Throwable { throw new RuntimeException("Should not be called."); } + + public void finished(){ + } }); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 01a2a7bcde..7562d726e0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -17,7 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; - +import java.util.Set; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; @@ -41,9 +41,11 @@ import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.Valve; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.SubscriptionKey; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; /** * The Topic is a destination that sends a copy of a message to every active @@ -52,7 +54,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; * @version $Revision: 1.21 $ */ public class Topic implements Destination { - + private static final Log log = LogFactory.getLog(Topic.class); protected final ActiveMQDestination destination; protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); protected final Valve dispatchValve = new Valve(true); @@ -196,6 +198,9 @@ public class Topic implements Destination { public void recoverMessageReference(String messageReference) throws Throwable { throw new RuntimeException("Should not be called."); } + + public void finished(){ + } }); if( true && subscription.getConsumerInfo().isRetroactive() ) { @@ -290,6 +295,30 @@ public class Topic implements Destination { public void stop() throws Exception { this.subscriptionRecoveryPolicy.stop(); } + + public Message[] browse(){ + final Set result=new CopyOnWriteArraySet(); + try{ + store.recover(new MessageRecoveryListener(){ + public void recoverMessage(Message message) throws Throwable{ + result.add(message); + } + + public void recoverMessageReference(String messageReference) throws Throwable{} + + public void finished(){} + }); + Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination()); + if(msgs!=null){ + for(int i=0;i= messages.length ) - tail = 0; + synchronized public boolean add(ConnectionContext context,MessageReference node) throws Throwable{ + messages[tail++]=node; + if(tail>=messages.length) + tail=0; return true; } - synchronized public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable { + synchronized public void recover(ConnectionContext context,Topic topic,Subscription sub) throws Throwable{ // Re-dispatch the last message seen. - int t = tail; + int t=tail; // The buffer may not have rolled over yet..., start from the front - if( messages[t]==null ) + if(messages[t]==null) t=0; // Well the buffer is really empty then. - if( messages[t]==null ) + if(messages[t]==null) return; - // Keep dispatching until t hit's tail again. - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - do { - MessageReference node = messages[t]; - try { - msgContext.setDestination(node.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(node); - if (sub.matches(node, msgContext)) { - sub.add(node); - } - } finally { - msgContext.clear(); - } - t++; - if( t >= messages.length ) - t = 0; - } while( t!=tail ); - + MessageEvaluationContext msgContext=context.getMessageEvaluationContext(); + do{ + MessageReference node=messages[t]; + try{ + msgContext.setDestination(node.getRegionDestination().getActiveMQDestination()); + msgContext.setMessageReference(node); + if(sub.matches(node,msgContext)){ + sub.add(node); + } + }finally{ + msgContext.clear(); + } + t++; + if(t>=messages.length) + t=0; + }while(t!=tail); } - public void start() throws Exception { - messages = new MessageReference[maximumSize]; + public void start() throws Exception{ + messages=new MessageReference[maximumSize]; } - public void stop() throws Exception { - messages = null; + public void stop() throws Exception{ + messages=null; } - - public int getMaximumSize() { + + public int getMaximumSize(){ return maximumSize; } /** * Sets the maximum number of messages that this destination will hold around in RAM */ - public void setMaximumSize(int maximumSize) { - this.maximumSize = maximumSize; + public void setMaximumSize(int maximumSize){ + this.maximumSize=maximumSize; } - + public Message[] browse(ActiveMQDestination destination) throws Throwable{ + List result=new ArrayList(); + DestinationFilter filter=DestinationFilter.parseFilter(destination); + int t=tail; + if(messages[t]==null) + t=0; + if(messages[t]!=null){ + do{ + MessageReference ref=messages[t]; + Message message=ref.getMessage(); + if(filter.matches(message.getDestination())){ + result.add(message); + } + t++; + if(t>=messages.length) + t=0; + }while(t!=tail); + } + return (Message[]) result.toArray(new Message[result.size()]); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java index 639dbba9bb..6c10c8eab1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java @@ -20,11 +20,16 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy.TimestampWrapper; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.memory.list.DestinationBasedMessageList; import org.apache.activemq.memory.list.MessageList; import org.apache.activemq.memory.list.SimpleMessageList; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -104,6 +109,10 @@ public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecover public void setUseSharedBuffer(boolean useSharedBuffer) { this.useSharedBuffer = useSharedBuffer; } + + public Message[] browse(ActiveMQDestination destination) throws Throwable{ + return buffer.browse(destination); + } // Implementation methods diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java index a3e392c229..20cdcbdf91 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java @@ -16,10 +16,15 @@ */ package org.apache.activemq.broker.region.policy; +import java.util.ArrayList; +import java.util.List; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; /** @@ -62,4 +67,13 @@ public class LastImageSubscriptionRecoveryPolicy implements SubscriptionRecovery public void stop() throws Exception { } + public Message[] browse(ActiveMQDestination destination) throws Throwable{ + List result = new ArrayList(); + DestinationFilter filter=DestinationFilter.parseFilter(destination); + if (filter.matches(lastImage.getMessage().getDestination())){ + result.add(lastImage.getMessage()); + } + return (Message[])result.toArray(new Message[result.size()]); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java index aa0b0f9924..cbdc2d26b5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java @@ -20,6 +20,8 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; /** * This is the default Topic recovery policy which does not recover any messages. @@ -43,4 +45,8 @@ public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy public void stop() throws Exception { } + public Message[] browse(ActiveMQDestination dest) throws Throwable{ + return new Message[0]; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java index e66d340bcb..0b3f195105 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java @@ -98,6 +98,10 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover public void setQuery(MessageQuery query) { this.query = query; } + + public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Throwable{ + return new org.apache.activemq.command.Message[0]; + } protected void dispatchInitialMessage(Message message, Destination regionDestination, MessageEvaluationContext msgContext, Subscription sub) { try { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java index ab9f36189a..e23fb28599 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java @@ -16,11 +16,14 @@ */ package org.apache.activemq.broker.region.policy; + import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; /** * Abstraction to allow different recovery policies to be plugged @@ -35,8 +38,9 @@ public interface SubscriptionRecoveryPolicy extends Service { * A message was sent to the destination. * * @param context + * @param message * @param node - * @return TODO + * @return true if successful * @throws Throwable */ boolean add(ConnectionContext context, MessageReference message) throws Throwable; @@ -45,11 +49,19 @@ public interface SubscriptionRecoveryPolicy extends Service { * Let a subscription recover message held by the policy. * * @param context - * @param topic TODO - * @param topic + * @param topic + * @param sub * @param node * @throws Throwable */ void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable; + + + /** + * @param dest + * @return messages + * @throws Throwable + */ + Message[] browse(ActiveMQDestination dest) throws Throwable; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java index edba608d5a..58054c2196 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java @@ -16,18 +16,20 @@ */ package org.apache.activemq.broker.region.policy; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.Topic; -import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.thread.Scheduler; - import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.DestinationFilter; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.thread.Scheduler; /** * This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed @@ -122,5 +124,20 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli public void setRecoverDuration(long recoverDuration) { this.recoverDuration = recoverDuration; } + + public Message[] browse(ActiveMQDestination destination) throws Throwable{ + List result = new ArrayList(); + ArrayList copy = new ArrayList(buffer); + DestinationFilter filter=DestinationFilter.parseFilter(destination); + for (Iterator iter = copy.iterator(); iter.hasNext();) { + TimestampWrapper timestampWrapper = (TimestampWrapper) iter.next(); + MessageReference ref = timestampWrapper.message; + Message message=ref.getMessage(); + if (filter.matches(message.getDestination())){ + result.add(message); + } + } + return (Message[]) result.toArray(new Message[result.size()]); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java b/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java index e74cb60ef6..0ec22e8b9c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java @@ -22,11 +22,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.memory.buffer.MessageBuffer; import org.apache.activemq.memory.buffer.MessageQueue; @@ -70,9 +70,13 @@ public class DestinationBasedMessageList implements MessageList { } public List getMessages(Subscription sub) { + return getMessages(sub.getConsumerInfo().getDestination()); + } + + protected List getMessages(ActiveMQDestination destination) { Set set = null; synchronized (lock) { - set = subscriptionIndex.get(sub.getConsumerInfo().getDestination()); + set = subscriptionIndex.get(destination); } List answer = new ArrayList(); for (Iterator iter = set.iterator(); iter.hasNext();) { @@ -81,6 +85,12 @@ public class DestinationBasedMessageList implements MessageList { } return answer; } + + public Message[] browse(ActiveMQDestination destination) { + List result = getMessages(destination); + return (Message[])result.toArray(new Message[result.size()]); + } + public void clear() { messageBuffer.clear(); diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java b/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java index af69f92653..558b3fffe7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java @@ -18,6 +18,8 @@ package org.apache.activemq.memory.list; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; import java.util.List; @@ -35,6 +37,14 @@ public interface MessageList { * Returns the current list of MessageReference objects for the given subscription */ List getMessages(Subscription sub); + + /** + * @param destination + * @return an array of Messages that match the destination + */ + Message[] browse(ActiveMQDestination destination); void clear(); + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java b/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java index c758d25f03..7aceca27ff 100644 --- a/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java @@ -18,8 +18,16 @@ package org.apache.activemq.memory.list; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.DestinationFilter; +import org.apache.activemq.network.DemandForwardingBridge; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -32,7 +40,7 @@ import java.util.List; * @version $Revision: 1.1 $ */ public class SimpleMessageList implements MessageList { - + static final private Log log=LogFactory.getLog(SimpleMessageList.class); private LinkedList list = new LinkedList(); private int maximumSize = 100 * 64 * 1024; private int size; @@ -60,6 +68,27 @@ public class SimpleMessageList implements MessageList { public List getMessages(Subscription sub) { return getList(); } + + public Message[] browse(ActiveMQDestination destination) { + List result = new ArrayList(); + DestinationFilter filter=DestinationFilter.parseFilter(destination); + synchronized(lock){ + for (Iterator i = list.iterator(); i.hasNext();){ + MessageReference ref = (MessageReference)i.next(); + Message msg; + try{ + msg=ref.getMessage(); + if (filter.matches(msg.getDestination())){ + result.add(msg); + } + }catch(IOException e){ + log.error("Failed to get Message from MessageReference: " + ref,e); + } + + } + } + return (Message[])result.toArray(new Message[result.size()]); + } /** * Returns a copy of the list diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java index 69bf004b08..7cca12e4d9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java @@ -24,4 +24,5 @@ import org.apache.activemq.command.Message; public interface MessageRecoveryListener { void recoverMessage(Message message) throws Throwable; void recoverMessageReference(String messageReference) throws Throwable; + void finished(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java index ee15e26323..c06981e39d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java @@ -25,4 +25,5 @@ import java.io.IOException; public interface JDBCMessageRecoveryListener { void recoverMessage(long sequenceId, byte[] message) throws IOException, Throwable; void recoverMessageReference(String reference) throws IOException, Throwable; + void finished(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index c41458faba..4a79c42d52 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -158,6 +158,9 @@ public class JDBCMessageStore implements MessageStore { public void recoverMessageReference(String reference) throws IOException, Throwable { listener.recoverMessageReference(reference); } + public void finished(){ + listener.finished(); + } }); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ",e); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index c720f8aa57..c0b0de6a61 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -75,6 +75,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess public void recoverMessageReference(String reference) throws IOException, Throwable { listener.recoverMessageReference(reference); } + + public void finished(){ + listener.finished(); + } }); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ",e); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 9c3f0370f6..ab59c0c812 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -337,12 +337,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter { listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2)); } } - } finally { close(rs); close(s); - } + listener.finished(); + } } @@ -402,7 +402,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter { finally { close(rs); close(s); + listener.finished(); } + } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java index 13a45840dd..a431db02d2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java @@ -360,6 +360,10 @@ public class QuickJournalMessageStore implements MessageStore { Message message = (Message) peristenceAdapter.readCommand(loc); listener.recoverMessage(message); } + + public void finished(){ + listener.finished(); + } }); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java index a6e7b65d00..c45e7e4817 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java @@ -63,6 +63,10 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl Message message = (Message) peristenceAdapter.readCommand(loc); listener.recoverMessage(message); } + + public void finished(){ + listener.finished(); + } }); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 66017dcfb8..73fc97ded4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -82,6 +82,7 @@ public class MemoryMessageStore implements MessageStore { listener.recoverMessage((Message) msg); } } + listener.finished(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java index 3924d159f9..1a6177c8a3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java @@ -107,6 +107,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic pastLastAck=entry.getKey().equals(lastAck); } } + listener.finished(); } }