r238@34: chirino | 2007-02-23 14:48:37 -0500

You can now disable a connection from watching topic advisories by setting the 'watchTopicAdvisories' option on the ActiveMQConnectionFactory to true.
 For large networks were lots of temporary topic consumers are being created and destroyed, this can result in lower overhead since those events do not need to get replicated to all the connections on the network. 
 
 This improves the handling of temp  destination over networks but it relaxed a few restrictions to get around timing issues with the networks.  If a message is sent to non-existant temp destination, the temp destination will be created so that the message is not dropped.  This could potentially create temp destinations for connections that will never get re-established.
 
 


git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@511082 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-02-23 20:23:35 +00:00
parent 88d56499d9
commit e04bd6f0e1
9 changed files with 233 additions and 110 deletions

View File

@ -129,6 +129,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean nestedMapAndListEnabled = true;
private boolean useRetroactiveConsumer;
private boolean useSyncSend=false;
private boolean watchTopicAdvisories=true;
private int closeTimeout = 15000;
private final Transport transport;
@ -1267,7 +1268,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
if( watchTopicAdvisories ) {
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
}
}
@ -1602,7 +1605,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*/
public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
checkClosedOrFailed();
checkClosedOrFailed();
for(Iterator i=this.sessions.iterator();i.hasNext();){
ActiveMQSession s=(ActiveMQSession) i.next();
if( s.isInUse(destination) ) {
throw new JMSException("A consumer is consuming from the temporary destination");
}
}
activeTempDestinations.remove(destination);
DestinationInfo info = new DestinationInfo();
@ -1616,6 +1628,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public boolean isDeleted(ActiveMQDestination dest) {
// If we are not watching the advisories.. then
// we will assume that the temp destination does exist.
if( advisoryConsumer==null )
return false;
return !activeTempDestinations.contains(dest);
}
@ -1912,5 +1930,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setUseSyncSend(boolean forceSyncSend) {
this.useSyncSend = forceSyncSend;
}
public synchronized boolean isWatchTopicAdvisories() {
return watchTopicAdvisories;
}
public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
this.watchTopicAdvisories = watchTopicAdvisories;
}
}

View File

@ -86,7 +86,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean useRetroactiveConsumer;
private boolean nestedMapAndListEnabled = true;
private boolean useSyncSend=false;
private boolean watchTopicAdvisories=true;
JMSStatsImpl factoryStats = new JMSStatsImpl();
static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@ -259,7 +260,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setRedeliveryPolicy(getRedeliveryPolicy());
connection.setUseSyncSend(isUseSyncSend());
connection.setWatchTopicAdvisories(watchTopicAdvisories);
transport.start();
if( clientID !=null )
@ -519,7 +521,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
props.setProperty("useCompression", Boolean.toString(isUseCompression()));
props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
if (getUserName() != null) {
props.setProperty("userName", getUserName());
@ -696,4 +698,12 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setUseSyncSend(boolean forceSyncSend) {
this.useSyncSend = forceSyncSend;
}
public synchronized boolean isWatchTopicAdvisories() {
return watchTopicAdvisories;
}
public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
this.watchTopicAdvisories = watchTopicAdvisories;
}
}

View File

@ -30,6 +30,7 @@ import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
@ -908,4 +909,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return false;
}
public boolean isInUse(ActiveMQTempDestination destination) {
return info.getDestination().equals(destination);
}
}

View File

@ -57,6 +57,7 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
@ -1756,6 +1757,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
}
public boolean isInUse(ActiveMQTempDestination destination) {
for(Iterator iter=consumers.iterator();iter.hasNext();){
ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
if( c.isInUse(destination) ) {
return true;
}
}
return false;
}
}

View File

@ -62,7 +62,6 @@ import org.apache.activemq.util.ServiceStopper;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
/**
* Routes Broker operations to the correct messaging regions for processing.
@ -84,7 +83,7 @@ public class RegionBroker implements Broker {
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
private final CopyOnWriteArraySet destinations = new CopyOnWriteArraySet();
private final HashMap destinations = new HashMap();
private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList();
private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
@ -242,11 +241,14 @@ public class RegionBroker implements Broker {
}
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
if( destinations.contains(destination) ){
throw new JMSException("Destination already exists: "+destination);
}
Destination answer = null;
Destination answer;
synchronized(destinations) {
answer = (Destination) destinations.get(destination);
if( answer!=null )
return answer;
}
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
answer = queueRegion.addDestination(context, destination);
@ -264,31 +266,33 @@ public class RegionBroker implements Broker {
throw createUnknownDestinationTypeException(destination);
}
destinations.add(destination);
return answer;
synchronized(destinations) {
destinations.put(destination, answer);
return answer;
}
}
public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
throws Exception{
if(destinations.contains(destination)){
switch(destination.getDestinationType()){
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeDestination(context,destination,timeout);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeDestination(context,destination,timeout);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeDestination(context,destination,timeout);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeDestination(context,destination,timeout);
break;
default:
throw createUnknownDestinationTypeException(destination);
}
destinations.remove(destination);
}
public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Exception{
synchronized(destinations) {
if( destinations.remove(destination)!=null ){
switch(destination.getDestinationType()){
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeDestination(context,destination,timeout);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeDestination(context,destination,timeout);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeDestination(context,destination,timeout);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeDestination(context,destination,timeout);
break;
default:
throw createUnknownDestinationTypeException(destination);
}
}
}
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
@ -302,7 +306,10 @@ public class RegionBroker implements Broker {
}
public ActiveMQDestination[] getDestinations() throws Exception {
ArrayList l = new ArrayList(destinations);
ArrayList l;
synchronized(destinations) {
l = new ArrayList(destinations.values());
}
ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
l.toArray(rc);
return rc;

View File

@ -18,7 +18,12 @@
package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -31,7 +36,26 @@ public class TempQueueRegion extends AbstractRegion {
public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
setAutoCreateDestinations(false);
// We should allow the following to be configurable via a Destination Policy
// setAutoCreateDestinations(false);
System.out.println("test");
}
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
if( !context.isNetworkConnection() && !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
}
super.addSubscription(context, sub);
};
};
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
@ -46,4 +70,15 @@ public class TempQueueRegion extends AbstractRegion {
return "TempQueueRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
// Force a timeout value so that we don't get an error that
// there is still an active sub. Temp destination may be removed
// while a network sub is still active which is valid.
if( timeout == 0 )
timeout = 1;
super.removeDestination(context, destination, timeout);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.broker.region;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -32,7 +33,8 @@ public class TempTopicRegion extends AbstractRegion {
public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
setAutoCreateDestinations(false);
// We should allow the following to be configurable via a Destination Policy
// setAutoCreateDestinations(false);
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
@ -47,5 +49,15 @@ public class TempTopicRegion extends AbstractRegion {
return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
// Force a timeout value so that we don't get an error that
// there is still an active sub. Temp destination may be removed
// while a network sub is still active which is valid.
if( timeout == 0 )
timeout = 1;
super.removeDestination(context, destination, timeout);
}
}

View File

@ -341,6 +341,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
lastConnectSucceeded.set(true);
serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
}else if(command.getClass() == ConnectionError.class ) {
ConnectionError ce = (ConnectionError) command;

View File

@ -535,81 +535,94 @@ public class BrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection1);
}
public void initCombosForTestTempDestinationsRemovedOnConnectionClose() {
addCombinationValues( "deliveryMode", new Object[]{
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)} );
addCombinationValues( "destinationType", new Object[]{
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
}
public void testTempDestinationsRemovedOnConnectionClose() throws Exception {
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo1);
//
// TODO: need to reimplement this since we don't fail when we send to a non-existant
// destination. But if we can access the Region directly then we should be able to
// check that if the destination was removed.
//
// public void initCombosForTestTempDestinationsRemovedOnConnectionClose() {
// addCombinationValues( "deliveryMode", new Object[]{
// new Integer(DeliveryMode.NON_PERSISTENT),
// new Integer(DeliveryMode.PERSISTENT)} );
// addCombinationValues( "destinationType", new Object[]{
// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
// }
//
// public void testTempDestinationsRemovedOnConnectionClose() throws Exception {
//
// // Setup a first connection
// StubConnection connection1 = createConnection();
// ConnectionInfo connectionInfo1 = createConnectionInfo();
// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
// connection1.send(connectionInfo1);
// connection1.send(sessionInfo1);
// connection1.send(producerInfo1);
//
// destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
//
// StubConnection connection2 = createConnection();
// ConnectionInfo connectionInfo2 = createConnectionInfo();
// SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
// ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
// connection2.send(connectionInfo2);
// connection2.send(sessionInfo2);
// connection2.send(producerInfo2);
//
// // Send from connection2 to connection1's temp destination. Should succeed.
// connection2.send(createMessage(producerInfo2, destination, deliveryMode));
//
// // Close connection 1
// connection1.request(closeConnectionInfo(connectionInfo1));
//
// try {
// // Send from connection2 to connection1's temp destination. Should not succeed.
// connection2.request(createMessage(producerInfo2, destination, deliveryMode));
// fail("Expected JMSException.");
// } catch ( JMSException success ) {
// }
//
// }
destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.send(producerInfo2);
// Send from connection2 to connection1's temp destination. Should succeed.
connection2.send(createMessage(producerInfo2, destination, deliveryMode));
// Close connection 1
connection1.request(closeConnectionInfo(connectionInfo1));
try {
// Send from connection2 to connection1's temp destination. Should not succeed.
connection2.request(createMessage(producerInfo2, destination, deliveryMode));
fail("Expected JMSException.");
} catch ( JMSException success ) {
}
}
public void initCombosForTestTempDestinationsAreNotAutoCreated() {
addCombinationValues( "deliveryMode", new Object[]{
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)} );
addCombinationValues( "destinationType", new Object[]{
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
}
public void testTempDestinationsAreNotAutoCreated() throws Exception {
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo1);
destination = ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1", destinationType);
// Should not be able to send to a non-existant temp destination.
try {
connection1.request(createMessage(producerInfo1, destination, deliveryMode));
fail("Expected JMSException.");
} catch ( JMSException success ) {
}
}
// public void initCombosForTestTempDestinationsAreNotAutoCreated() {
// addCombinationValues( "deliveryMode", new Object[]{
// new Integer(DeliveryMode.NON_PERSISTENT),
// new Integer(DeliveryMode.PERSISTENT)} );
// addCombinationValues( "destinationType", new Object[]{
// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
// }
//
//
// We create temp destination on demand now so this test case is no longer
// valid.
//
// public void testTempDestinationsAreNotAutoCreated() throws Exception {
//
// // Setup a first connection
// StubConnection connection1 = createConnection();
// ConnectionInfo connectionInfo1 = createConnectionInfo();
// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
// connection1.send(connectionInfo1);
// connection1.send(sessionInfo1);
// connection1.send(producerInfo1);
//
// destination = ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1", destinationType);
//
// // Should not be able to send to a non-existant temp destination.
// try {
// connection1.request(createMessage(producerInfo1, destination, deliveryMode));
// fail("Expected JMSException.");
// } catch ( JMSException success ) {
// }
//
// }
public void initCombosForTestTempDestinationsOnlyAllowsLocalConsumers() {
addCombinationValues( "deliveryMode", new Object[]{