From 7c7cca9dfb8eb503e0a8eee5f5b21a7c30d888df Mon Sep 17 00:00:00 2001 From: James Strachan Date: Mon, 19 Dec 2005 15:54:17 +0000 Subject: [PATCH] added a helper class and strategy method for people wishing to implement their own custom MessageQuery strategies to be fired as a new subscriber is created git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@357712 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/policy/MessageQuery.java | 39 +++++++ .../QueryBasedSubscriptionRecoveryPolicy.java | 101 ++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java create mode 100644 activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java diff --git a/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java b/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java new file mode 100644 index 0000000000..c9d7dbb617 --- /dev/null +++ b/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java @@ -0,0 +1,39 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.broker.region.policy; + +import org.activemq.command.ActiveMQDestination; + +import javax.jms.MessageListener; + +/** + * Represents some kind of query which will load messages from some source. + * + * @version $Revision$ + */ +public interface MessageQuery { + + /** + * Executes the query for messages; each message is passed into the listener + * + * @param destination the destination on which the query is to be performed + * @param listener is the listener to notify as each message is created or loaded + */ + public void execute(ActiveMQDestination destination, MessageListener listener); + +} diff --git a/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java new file mode 100644 index 0000000000..b007141dc5 --- /dev/null +++ b/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java @@ -0,0 +1,101 @@ +/** + * ActiveMQ: The Open Source Message Fabric + * + * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.broker.region.policy; + +import org.activemq.ActiveMQMessageTransformation; +import org.activemq.broker.ConnectionContext; +import org.activemq.broker.region.MessageReference; +import org.activemq.broker.region.Subscription; +import org.activemq.command.ActiveMQDestination; +import org.activemq.command.ActiveMQMessage; +import org.activemq.filter.MessageEvaluationContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.Message; +import javax.jms.MessageListener; + +/** + * This implementation of {@link SubscriptionRecoveryPolicy} will perform a user + * specific query mechanism to load any messages they may have missed. + * + * @org.xbean.XBean + * + * @version $Revision$ + */ +public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { + + private static final Log log = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class); + + private MessageQuery query; + + public void add(ConnectionContext context, MessageReference message) throws Throwable { + } + + public void recover(ConnectionContext context, final Subscription sub) throws Throwable { + if (query != null) { + final MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); + try { + ActiveMQDestination destination = sub.getConsumerInfo().getDestination(); + query.execute(destination, new MessageListener() { + public void onMessage(Message message) { + dispatchInitialMessage(message, msgContext, sub); + } + }); + } + finally { + msgContext.clear(); + } + } + } + + public void start() throws Exception { + if (query != null) { + throw new IllegalArgumentException("No query property configured"); + } + } + + public void stop() throws Exception { + } + + public MessageQuery getQuery() { + return query; + } + + /** + * Sets the query strategy to load initial messages + */ + public void setQuery(MessageQuery query) { + this.query = query; + } + + protected void dispatchInitialMessage(Message message, MessageEvaluationContext msgContext, Subscription sub) { + try { + ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null); + msgContext.setDestination(activeMessage.getDestination()); + msgContext.setMessageReference(activeMessage); + if (sub.matches(activeMessage, msgContext)) { + sub.add(activeMessage); + } + } + catch (Throwable e) { + log.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e); + } + } +}