- Refactored out introspection based toString() logic to the IntrospectionSupport class

- Durable subscriptions are now eagerly loaded when a topic is created.
- Fixed the *TransactionTest 

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@366220 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-01-05 17:09:05 +00:00
parent b543609252
commit 013f37294e
11 changed files with 163 additions and 67 deletions

View File

@ -31,6 +31,8 @@ import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@ -40,6 +42,8 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
*/
abstract public class AbstractRegion implements Region {
private static final Log log = LogFactory.getLog(AbstractRegion.class);
protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
protected final DestinationMap destinationMap = new DestinationMap();
protected final ConcurrentHashMap subscriptions = new ConcurrentHashMap();
@ -57,7 +61,14 @@ abstract public class AbstractRegion implements Region {
this.persistenceAdapter = persistenceAdapter;
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
log.debug("Adding destination: "+destination);
Destination dest = createDestination(destination);
dest.start();
synchronized(destinationsMutex){
@ -86,6 +97,7 @@ abstract public class AbstractRegion implements Region {
}
}
log.debug("Removing destination: "+destination);
synchronized(destinationsMutex){
Destination dest=(Destination) destinations.remove(destination);
if(dest==null)

View File

@ -36,7 +36,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
abstract public class AbstractSubscription implements Subscription {
protected final Log log;
static private final Log log = LogFactory.getLog(AbstractSubscription.class);
protected ConnectionContext context;
protected ConsumerInfo info;
@ -50,7 +50,6 @@ abstract public class AbstractSubscription implements Subscription {
this.info = info;
this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
this.selector = parseSelector(info);
this.log = LogFactory.getLog(getClass().getName()+"."+info.getConsumerId());
}
static private BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {

View File

@ -26,6 +26,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.SubscriptionInfo;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@ -44,6 +45,22 @@ public class DurableTopicSubscription extends PrefetchSubscription {
this.subscriptionName = info.getSubcriptionName();
}
public DurableTopicSubscription(SubscriptionInfo info) throws InvalidSelectorException {
super(null, createFakeConsumerInfo(info));
this.clientId = info.getClientId();
this.subscriptionName = info.getSubcriptionName();
active=false;
recovered=false;
}
private static ConsumerInfo createFakeConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubcriptionName(info.getSubcriptionName());
rc.setDestination(info.getDestination());
return rc;
}
synchronized public boolean isActive() {
return active;
}

View File

@ -25,6 +25,8 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@ -39,6 +41,7 @@ import java.util.LinkedList;
* @version $Revision: 1.15 $
*/
abstract public class PrefetchSubscription extends AbstractSubscription {
static private final Log log = LogFactory.getLog(PrefetchSubscription.class);
final protected LinkedList matched = new LinkedList();
final protected LinkedList dispatched = new LinkedList();

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
@ -25,16 +26,16 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
/**
* A Region is used to implement the different QOS options available to
* a broker. A Broker is composed of multiple mesasge processing Regions that
* a broker. A Broker is composed of multiple message processing Regions that
* provide different QOS options.
*
* @version $Revision$
*/
public interface Region {
public interface Region extends Service {
/**
* Used to create a destination. Usually, this method is invoked as a side-effect of sending
* a message to a destiantion that does not exist yet.
* a message to a destination that does not exist yet.
*
* @param context
* @param destination the destination to create.
@ -43,11 +44,11 @@ public interface Region {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable;
/**
* Used to destory a destination.
* This shoud try to quiesce use of the destination up to the timeout alotted time before removing the destination.
* Used to destroy a destination.
* This should try to quiesce use of the destination up to the timeout allotted time before removing the destination.
* This will remove all persistent messages associated with the destination.
*
* @param context the enviorment the operation is being executed under.
* @param context the environment the operation is being executed under.
* @param destination what is being removed from the broker.
* @param timeout the max amount of time to wait for the destination to quiesce
*/
@ -55,19 +56,19 @@ public interface Region {
/**
* Adds a consumer.
* @param context the enviorment the operation is being executed under.
* @param context the environment the operation is being executed under.
*/
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable;
/**
* Removes a consumer.
* @param context the enviorment the operation is being executed under.
* @param context the environment the operation is being executed under.
*/
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable;
/**
* Deletes a durable subscription.
* @param context the enviorment the operation is being executed under.
* @param context the environment the operation is being executed under.
* @param info TODO
*/
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Throwable;
@ -76,13 +77,13 @@ public interface Region {
* Send a message to the broker to using the specified destination. The destination specified
* in the message does not need to match the destination the message is sent to. This is
* handy in case the message is being sent to a dead letter destination.
* @param context the enviorment the operation is being executed under.
* @param context the environment the operation is being executed under.
*/
public void send(ConnectionContext context, Message message) throws Throwable;
/**
* Used to acknowledge the receipt of a message by a client.
* @param context the enviorment the operation is being executed under.
* @param context the environment the operation is being executed under.
*/
public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable;

View File

@ -39,6 +39,7 @@ import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
@ -108,9 +109,19 @@ public class RegionBroker implements Broker {
public void start() throws Exception {
queueRegion.start();
topicRegion.start();
tempQueueRegion.start();
tempTopicRegion.start();
}
public void stop() throws Exception {
ServiceStopper ss = new ServiceStopper();
ss.stop(queueRegion);
ss.stop(topicRegion);
ss.stop(tempQueueRegion);
ss.stop(tempTopicRegion);
ss.throwFirstException();
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Throwable {

View File

@ -25,11 +25,14 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@ -42,7 +45,8 @@ import java.util.Set;
* @version $Revision: 1.12 $
*/
public class TopicRegion extends AbstractRegion {
private static final Log log = LogFactory.getLog(TopicRegion.class);
protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
private final PolicyMap policyMap;
@ -78,7 +82,8 @@ public class TopicRegion extends AbstractRegion {
}
else {
// Change the consumer id key of the durable sub.
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
if( sub.getConsumerInfo().getConsumerId()!=null )
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
subscriptions.put(info.getConsumerId(), sub);
sub.activate(context, info);
}
@ -136,6 +141,16 @@ public class TopicRegion extends AbstractRegion {
TopicMessageStore store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
// Eagerly recover the durable subscriptions
if (store != null) {
SubscriptionInfo[] infos = store.getAllSubscriptions();
for (int i = 0; i < infos.length; i++) {
log.info("Restoring durable subscription: "+infos[i]);
createDurableSubscription(infos[i]);
}
}
return topic;
}
@ -165,6 +180,15 @@ public class TopicRegion extends AbstractRegion {
return new TopicSubscription(context, info, memoryManager);
}
}
public Subscription createDurableSubscription(SubscriptionInfo info) throws JMSException {
SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
sub = new DurableTopicSubscription(info);
durableSubscriptions.put(key, sub);
return sub;
}
/**
*/

View File

@ -16,10 +16,7 @@
*/
package org.apache.activemq.command;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.LinkedHashMap;
import org.apache.activemq.util.IntrospectionSupport;
/**
*
@ -87,50 +84,7 @@ abstract public class BaseCommand implements Command {
}
public String toString() {
LinkedHashMap map = new LinkedHashMap();
addFields(map, getClass());
return simpleName(getClass())+" "+map;
return IntrospectionSupport.toString(this, BaseCommand.class);
}
public static String simpleName(Class clazz) {
String name = clazz.getName();
int p = name.lastIndexOf(".");
if( p >= 0 ) {
name = name.substring(p+1);
}
return name;
}
private void addFields(LinkedHashMap map, Class clazz) {
if( clazz!=BaseCommand.class )
addFields( map, clazz.getSuperclass() );
Field[] fields = clazz.getDeclaredFields();
for (int i = 0; i < fields.length; i++) {
Field field = fields[i];
if( Modifier.isStatic(field.getModifiers()) ||
Modifier.isTransient(field.getModifiers()) ||
Modifier.isPrivate(field.getModifiers()) ) {
continue;
}
try {
Object o = field.get(this);
if( o!=null && o.getClass().isArray() ) {
try {
o = Arrays.asList((Object[]) o);
} catch (Throwable e) {
}
}
map.put(field.getName(), o);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.command;
import org.apache.activemq.util.IntrospectionSupport;
/**
*
@ -82,4 +84,9 @@ public class SubscriptionInfo implements DataStructure {
public boolean isMarshallAware() {
return false;
}
public String toString() {
return IntrospectionSupport.toString(this);
}
}

View File

@ -18,11 +18,15 @@ package org.apache.activemq.util;
import java.beans.PropertyEditor;
import java.beans.PropertyEditorManager;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
@ -138,5 +142,56 @@ public class IntrospectionSupport {
return false;
}
static public String toString(Object target) {
return toString(target, Object.class);
}
static public String toString(Object target, Class stopClass) {
LinkedHashMap map = new LinkedHashMap();
addFields(target, target.getClass(), stopClass, map);
return simpleName(target.getClass())+" "+map;
}
static public String simpleName(Class clazz) {
String name = clazz.getName();
int p = name.lastIndexOf(".");
if( p >= 0 ) {
name = name.substring(p+1);
}
return name;
}
static private void addFields(Object target, Class startClass, Class stopClass, LinkedHashMap map) {
if( startClass!=stopClass )
addFields( target, startClass.getSuperclass(), stopClass, map );
Field[] fields = startClass.getDeclaredFields();
for (int i = 0; i < fields.length; i++) {
Field field = fields[i];
if( Modifier.isStatic(field.getModifiers()) ||
Modifier.isTransient(field.getModifiers()) ||
Modifier.isPrivate(field.getModifiers()) ) {
continue;
}
try {
field.setAccessible(true);
Object o = field.get(target);
if( o!=null && o.getClass().isArray() ) {
try {
o = Arrays.asList((Object[]) o);
} catch (Throwable e) {
}
}
map.put(field.getName(), o);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.test.JmsResourceProvider;
import org.apache.activemq.test.TestSupport;
@ -31,6 +33,9 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.MessageListener;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
@ -57,6 +62,8 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
private List ackMessages = new ArrayList(messageCount);
private boolean resendPhase = false;
private BrokerService broker;
public JmsTransactionTestSupport() {
super();
}
@ -70,7 +77,9 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
super.setUp();
broker = createBroker();
broker.start();
resourceProvider = getJmsResourceProvider();
topic = resourceProvider.isTopic();
// We will be using transacted sessions.
@ -79,17 +88,21 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
reconnect();
}
/**
*/
protected BrokerService createBroker() throws Exception, URISyntaxException {
return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
}
/* (non-Javadoc)
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
//TODO
//log.info("Test Done. Stats");
//((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
log.info("Closing down connection");
session.close();
connection.close();
broker.stop();
log.info("Connection closed.");
}