* added test case to demonstrate query-based subscription recovery policy in action.

* minor refactor to the SubscriptionRecoveryPolicy API to make it easy to generate messages from inside the recovery policy

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@357732 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2005-12-19 17:43:22 +00:00
parent 99ee7f85ef
commit 8b1f5a7202
12 changed files with 272 additions and 21 deletions

View File

@ -48,6 +48,11 @@ import org.activemq.command.ActiveMQTempTopic;
import org.activemq.command.ActiveMQTextMessage;
import org.activemq.command.ActiveMQTopic;
/**
* A helper class for converting normal JMS interfaces into ActiveMQ specific ones.
*
* @version $Revision: 1.1 $
*/
public class ActiveMQMessageTransformation {
/**

View File

@ -90,7 +90,7 @@ public class Topic implements Destination {
}
else {
if (sub.getConsumerInfo().isRetroactive()) {
subscriptionRecoveryPolicy.recover(context, sub);
subscriptionRecoveryPolicy.recover(context, this, sub);
}
consumers.add(sub);
}
@ -272,8 +272,9 @@ public class Topic implements Destination {
dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
subscriptionRecoveryPolicy.add(context, message);
if (! subscriptionRecoveryPolicy.add(context, message)) {
return;
}
if (consumers.isEmpty())
return;

View File

@ -24,6 +24,7 @@ import java.util.List;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
import org.activemq.broker.region.Topic;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.memory.list.DestinationBasedMessageList;
import org.activemq.memory.list.MessageList;
@ -44,11 +45,12 @@ public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecover
private int maximumSize = 100 * 64 * 1024;
private boolean useSharedBuffer = true;
public void add(ConnectionContext context, MessageReference message) throws Throwable {
public boolean add(ConnectionContext context, MessageReference message) throws Throwable {
buffer.add(message);
return true;
}
public void recover(ConnectionContext context, Subscription sub) throws Throwable {
public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable {
// Re-dispatch the messages from the buffer.
List copy = buffer.getMessages(sub);
if( !copy.isEmpty() ) {

View File

@ -21,6 +21,7 @@ package org.activemq.broker.region.policy;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
import org.activemq.broker.region.Topic;
import org.activemq.filter.MessageEvaluationContext;
/**
@ -35,11 +36,12 @@ public class LastImageSubscriptionRecoveryPolicy implements SubscriptionRecovery
volatile private MessageReference lastImage;
public void add(ConnectionContext context, MessageReference node) throws Throwable {
public boolean add(ConnectionContext context, MessageReference node) throws Throwable {
lastImage = node;
return true;
}
public void recover(ConnectionContext context, Subscription sub) throws Throwable {
public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable {
// Re-dispatch the last message seen.
MessageReference node = lastImage;
if( node != null ){

View File

@ -18,11 +18,12 @@
package org.activemq.broker.region.policy;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.Message;
import javax.jms.MessageListener;
/**
* Represents some kind of query which will load messages from some source.
* Represents some kind of query which will load initial messages from some source for a new topic subscriber.
*
* @version $Revision$
*/
@ -34,6 +35,20 @@ public interface MessageQuery {
* @param destination the destination on which the query is to be performed
* @param listener is the listener to notify as each message is created or loaded
*/
public void execute(ActiveMQDestination destination, MessageListener listener);
public void execute(ActiveMQDestination destination, MessageListener listener) throws Exception;
/**
* Returns true if the given update is valid and does not overlap with the initial message query.
* When performing an initial load from some source, there is a chance that an update may occur which is logically before
* the message sent on the initial load - so this method provides a hook where the query instance can keep track of the version IDs
* of the messages sent so that if an older version is sent as an update it can be excluded to avoid going backwards in time.
*
* e.g. if the execute() method creates version 2 of an object and then an update message is sent for version 1, this method should return false to
* hide the old update message.
*
* @param message the update message which may have been sent before the query actually completed
* @return true if the update message is valid otherwise false in which case the update message will be discarded.
*/
public boolean validateUpdate(Message message);
}

View File

@ -21,6 +21,7 @@ package org.activemq.broker.region.policy;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
import org.activemq.broker.region.Topic;
/**
* This is the default Topic recovery policy which does not recover any messages.
@ -31,10 +32,11 @@ import org.activemq.broker.region.Subscription;
*/
public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
public void add(ConnectionContext context, MessageReference node) throws Throwable {
public boolean add(ConnectionContext context, MessageReference node) throws Throwable {
return true;
}
public void recover(ConnectionContext context, Subscription sub) throws Throwable {
public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable {
}
public void start() throws Exception {

View File

@ -18,13 +18,22 @@
**/
package org.activemq.broker.region.policy;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
import org.activemq.ActiveMQMessageTransformation;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.Destination;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
import org.activemq.broker.region.Topic;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQMessage;
import org.activemq.command.ConnectionId;
import org.activemq.command.MessageId;
import org.activemq.command.ProducerId;
import org.activemq.command.SessionId;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -44,18 +53,25 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover
private static final Log log = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class);
private MessageQuery query;
private AtomicLong messageSequence = new AtomicLong(0);
private IdGenerator idGenerator = new IdGenerator();
private ProducerId producerId = createProducerId();
public void add(ConnectionContext context, MessageReference message) throws Throwable {
public QueryBasedSubscriptionRecoveryPolicy() {
}
public void recover(ConnectionContext context, final Subscription sub) throws Throwable {
public boolean add(ConnectionContext context, MessageReference message) throws Throwable {
return query.validateUpdate(message.getMessage());
}
public void recover(ConnectionContext context, final Topic topic, final Subscription sub) throws Throwable {
if (query != null) {
final MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
ActiveMQDestination destination = sub.getConsumerInfo().getDestination();
query.execute(destination, new MessageListener() {
public void onMessage(Message message) {
dispatchInitialMessage(message, msgContext, sub);
dispatchInitialMessage(message, topic, msgContext, sub);
}
});
}
@ -66,7 +82,7 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover
}
public void start() throws Exception {
if (query != null) {
if (query == null) {
throw new IllegalArgumentException("No query property configured");
}
}
@ -85,10 +101,17 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover
this.query = query;
}
protected void dispatchInitialMessage(Message message, MessageEvaluationContext msgContext, Subscription sub) {
protected void dispatchInitialMessage(Message message, Destination regionDestination, MessageEvaluationContext msgContext, Subscription sub) {
try {
ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null);
msgContext.setDestination(activeMessage.getDestination());
ActiveMQDestination destination = activeMessage.getDestination();
if (destination == null) {
destination = sub.getConsumerInfo().getDestination();
activeMessage.setDestination(destination);
}
activeMessage.setRegionDestination(regionDestination);
configure(activeMessage);
msgContext.setDestination(destination);
msgContext.setMessageReference(activeMessage);
if (sub.matches(activeMessage, msgContext)) {
sub.add(activeMessage);
@ -98,4 +121,18 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover
log.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e);
}
}
protected void configure(ActiveMQMessage msg) {
long sequenceNumber = messageSequence.incrementAndGet();
msg.setMessageId(new MessageId(producerId, sequenceNumber));
msg.onSend();
msg.setProducerId(producerId);
}
protected ProducerId createProducerId() {
String id = idGenerator.generateId();
ConnectionId connectionId = new ConnectionId(id);
SessionId sessionId = new SessionId(connectionId, 1);
return new ProducerId(sessionId, 1);
}
}

View File

@ -22,6 +22,7 @@ import org.activemq.Service;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
import org.activemq.broker.region.Topic;
/**
* Abstraction to allow different recovery policies to be plugged
@ -37,17 +38,20 @@ public interface SubscriptionRecoveryPolicy extends Service {
*
* @param context
* @param node
* @return TODO
* @throws Throwable
*/
void add(ConnectionContext context, MessageReference message) throws Throwable;
boolean add(ConnectionContext context, MessageReference message) throws Throwable;
/**
* Let a subscription recover message held by the policy.
*
* @param context
* @param topic TODO
* @param topic
* @param node
* @throws Throwable
*/
void recover(ConnectionContext context, Subscription sub) throws Throwable;
void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable;
}

View File

@ -27,6 +27,7 @@ import java.util.List;
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
import org.activemq.broker.region.Topic;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.thread.Scheduler;
@ -66,11 +67,12 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
}
};
public void add(ConnectionContext context, MessageReference message) throws Throwable {
public boolean add(ConnectionContext context, MessageReference message) throws Throwable {
buffer.add(new TimestampWrapper(message, lastGCRun));
return true;
}
public void recover(ConnectionContext context, Subscription sub) throws Throwable {
public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable {
// Re-dispatch the messages from the buffer.
ArrayList copy = new ArrayList(buffer);

View File

@ -0,0 +1,47 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.test.retroactive;
import org.activemq.broker.region.policy.MessageQuery;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQTextMessage;
import org.activemq.command.Message;
import javax.jms.MessageListener;
/**
*
* @version $Revision$
*/
public class DummyMessageQuery implements MessageQuery {
public static int messageCount = 10;
public void execute(ActiveMQDestination destination, MessageListener listener) throws Exception {
System.out.println("Initial query is creating: " + messageCount + " messages");
for (int i = 0; i < messageCount; i++) {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("Initial message: " + i + " loaded from query");
listener.onMessage(message);
}
}
public boolean validateUpdate(Message message) {
return true;
}
}

View File

@ -0,0 +1,108 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.test.retroactive;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.EmbeddedBrokerTestSupport;
import org.activemq.broker.BrokerService;
import org.activemq.util.MessageList;
import org.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Date;
/**
*
* @version $Revision$
*/
public class RetroactiveConsumerWithMessageQueryTest extends EmbeddedBrokerTestSupport {
protected int messageCount = 20;
protected Connection connection;
protected Session session;
public void testConsumeAndReceiveInitialQueryBeforeUpdates() throws Exception {
// lets some messages
connection = createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
MessageConsumer consumer = session.createConsumer(destination);
MessageList listener = new MessageList();
listener.setVerbose(true);
consumer.setMessageListener(listener);
MessageProducer producer = session.createProducer(destination);
int updateMessageCount = messageCount - DummyMessageQuery.messageCount;
for (int i = 0; i < updateMessageCount; i++) {
TextMessage message = session.createTextMessage("Update Message: " + i + " sent at: " + new Date());
producer.send(message);
}
producer.close();
System.out.println("Sent: " + updateMessageCount + " update messages");
listener.assertMessagesReceived(messageCount);
}
protected void setUp() throws Exception {
useTopic = true;
bindAddress = "vm://localhost";
super.setUp();
}
protected void tearDown() throws Exception {
if (session != null) {
session.close();
session = null;
}
if (connection != null) {
connection.close();
}
super.tearDown();
}
protected ConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(bindAddress);
answer.setUseRetroactiveConsumer(true);
return answer;
}
protected BrokerService createBroker() throws Exception {
String uri = getBrokerXml();
System.out.println("Loading broker configuration from the classpath with URI: " + uri);
BrokerFactoryBean factory = new BrokerFactoryBean(new ClassPathResource(uri));
factory.afterPropertiesSet();
return factory.getBroker();
}
protected void startBroker() throws Exception {
// broker already started by XBean
}
protected String getBrokerXml() {
return "org/activemq/test/retroactive/activemq-message-query.xml";
}
}

View File

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: xbean -->
<beans xmlns="http://activemq.org/config/1.0"
xmlns:s="http://xbean.org/spring/">
<broker persistent="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="org.activemq.test.>">
<subscriptionRecoveryPolicy>
<queryBasedSubscriptionRecoveryPolicy query="#myQuery" />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
<bean id="myQuery"
class="org.activemq.test.retroactive.DummyMessageQuery" />
</beans>
<!-- END SNIPPET: xbean -->