From e04bd6f0e12496b861faad85a9e0e4de990948a0 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 23 Feb 2007 20:23:35 +0000 Subject: [PATCH] r238@34: chirino | 2007-02-23 14:48:37 -0500 You can now disable a connection from watching topic advisories by setting the 'watchTopicAdvisories' option on the ActiveMQConnectionFactory to true. For large networks were lots of temporary topic consumers are being created and destroyed, this can result in lower overhead since those events do not need to get replicated to all the connections on the network. This improves the handling of temp destination over networks but it relaxed a few restrictions to get around timing issues with the networks. If a message is sent to non-existant temp destination, the temp destination will be created so that the message is not dropped. This could potentially create temp destinations for connections that will never get re-established. git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@511082 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 32 +++- .../activemq/ActiveMQConnectionFactory.java | 16 +- .../activemq/ActiveMQMessageConsumer.java | 5 + .../org/apache/activemq/ActiveMQSession.java | 11 ++ .../activemq/broker/region/RegionBroker.java | 69 ++++---- .../broker/region/TempQueueRegion.java | 37 ++++- .../broker/region/TempTopicRegion.java | 14 +- .../DemandForwardingBridgeSupport.java | 2 + .../apache/activemq/broker/BrokerTest.java | 157 ++++++++++-------- 9 files changed, 233 insertions(+), 110 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index b2c9219037..4f3fcb3b3f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -129,6 +129,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean nestedMapAndListEnabled = true; private boolean useRetroactiveConsumer; private boolean useSyncSend=false; + private boolean watchTopicAdvisories=true; private int closeTimeout = 15000; private final Transport transport; @@ -1267,7 +1268,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // broker without having to do an RPC to the broker. ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId()); - advisoryConsumer = new AdvisoryConsumer(this, consumerId); + if( watchTopicAdvisories ) { + advisoryConsumer = new AdvisoryConsumer(this, consumerId); + } } @@ -1602,7 +1605,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon */ public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { - checkClosedOrFailed(); + checkClosedOrFailed(); + + for(Iterator i=this.sessions.iterator();i.hasNext();){ + ActiveMQSession s=(ActiveMQSession) i.next(); + if( s.isInUse(destination) ) { + throw new JMSException("A consumer is consuming from the temporary destination"); + } + } + + activeTempDestinations.remove(destination); DestinationInfo info = new DestinationInfo(); @@ -1616,6 +1628,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public boolean isDeleted(ActiveMQDestination dest) { + + // If we are not watching the advisories.. then + // we will assume that the temp destination does exist. + if( advisoryConsumer==null ) + return false; + return !activeTempDestinations.contains(dest); } @@ -1912,5 +1930,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void setUseSyncSend(boolean forceSyncSend) { this.useSyncSend = forceSyncSend; } + + + public synchronized boolean isWatchTopicAdvisories() { + return watchTopicAdvisories; + } + + + public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { + this.watchTopicAdvisories = watchTopicAdvisories; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index df40a8ec78..e7261c65fd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -86,7 +86,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private boolean useRetroactiveConsumer; private boolean nestedMapAndListEnabled = true; private boolean useSyncSend=false; - + private boolean watchTopicAdvisories=true; + JMSStatsImpl factoryStats = new JMSStatsImpl(); static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { @@ -259,7 +260,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); connection.setRedeliveryPolicy(getRedeliveryPolicy()); connection.setUseSyncSend(isUseSyncSend()); - + connection.setWatchTopicAdvisories(watchTopicAdvisories); + transport.start(); if( clientID !=null ) @@ -519,7 +521,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); props.setProperty("useCompression", Boolean.toString(isUseCompression())); props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); - + props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); if (getUserName() != null) { props.setProperty("userName", getUserName()); @@ -696,4 +698,12 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public void setUseSyncSend(boolean forceSyncSend) { this.useSyncSend = forceSyncSend; } + + public synchronized boolean isWatchTopicAdvisories() { + return watchTopicAdvisories; + } + + public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { + this.watchTopicAdvisories = watchTopicAdvisories; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a68aa9c62c..3379d7c6bb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -30,6 +30,7 @@ import javax.jms.MessageListener; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; @@ -908,4 +909,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return false; } + public boolean isInUse(ActiveMQTempDestination destination) { + return info.getDestination().equals(destination); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 444228a523..fc3a8c2fe5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -57,6 +57,7 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Command; @@ -1756,6 +1757,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } } + public boolean isInUse(ActiveMQTempDestination destination) { + for(Iterator iter=consumers.iterator();iter.hasNext();){ + ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next(); + if( c.isInUse(destination) ) { + return true; + } + } + return false; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index d2b9280650..30b6e384af 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -62,7 +62,6 @@ import org.apache.activemq.util.ServiceStopper; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; /** * Routes Broker operations to the correct messaging regions for processing. @@ -84,7 +83,7 @@ public class RegionBroker implements Broker { protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); - private final CopyOnWriteArraySet destinations = new CopyOnWriteArraySet(); + private final HashMap destinations = new HashMap(); private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList(); private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); @@ -242,11 +241,14 @@ public class RegionBroker implements Broker { } public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - if( destinations.contains(destination) ){ - throw new JMSException("Destination already exists: "+destination); - } - - Destination answer = null; + + Destination answer; + synchronized(destinations) { + answer = (Destination) destinations.get(destination); + if( answer!=null ) + return answer; + } + switch(destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: answer = queueRegion.addDestination(context, destination); @@ -264,31 +266,33 @@ public class RegionBroker implements Broker { throw createUnknownDestinationTypeException(destination); } - destinations.add(destination); - return answer; + synchronized(destinations) { + destinations.put(destination, answer); + return answer; + } } - public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) - throws Exception{ - if(destinations.contains(destination)){ - switch(destination.getDestinationType()){ - case ActiveMQDestination.QUEUE_TYPE: - queueRegion.removeDestination(context,destination,timeout); - break; - case ActiveMQDestination.TOPIC_TYPE: - topicRegion.removeDestination(context,destination,timeout); - break; - case ActiveMQDestination.TEMP_QUEUE_TYPE: - tempQueueRegion.removeDestination(context,destination,timeout); - break; - case ActiveMQDestination.TEMP_TOPIC_TYPE: - tempTopicRegion.removeDestination(context,destination,timeout); - break; - default: - throw createUnknownDestinationTypeException(destination); - } - destinations.remove(destination); - } + public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Exception{ + synchronized(destinations) { + if( destinations.remove(destination)!=null ){ + switch(destination.getDestinationType()){ + case ActiveMQDestination.QUEUE_TYPE: + queueRegion.removeDestination(context,destination,timeout); + break; + case ActiveMQDestination.TOPIC_TYPE: + topicRegion.removeDestination(context,destination,timeout); + break; + case ActiveMQDestination.TEMP_QUEUE_TYPE: + tempQueueRegion.removeDestination(context,destination,timeout); + break; + case ActiveMQDestination.TEMP_TOPIC_TYPE: + tempTopicRegion.removeDestination(context,destination,timeout); + break; + default: + throw createUnknownDestinationTypeException(destination); + } + } + } } public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ @@ -302,7 +306,10 @@ public class RegionBroker implements Broker { } public ActiveMQDestination[] getDestinations() throws Exception { - ArrayList l = new ArrayList(destinations); + ArrayList l; + synchronized(destinations) { + l = new ArrayList(destinations.values()); + } ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; l.toArray(rc); return rc; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java index 5e0e8cad0c..8f1eab9455 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java @@ -18,7 +18,12 @@ package org.apache.activemq.broker.region; import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.thread.TaskRunnerFactory; @@ -31,7 +36,26 @@ public class TempQueueRegion extends AbstractRegion { public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); - setAutoCreateDestinations(false); + // We should allow the following to be configurable via a Destination Policy + // setAutoCreateDestinations(false); + System.out.println("test"); + } + + protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { + final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; + return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) { + + public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { + + // Only consumers on the same connection can consume from + // the temporary destination + if( !context.isNetworkConnection() && !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) { + throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest); + } + super.addSubscription(context, sub); + }; + + }; } protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { @@ -46,4 +70,15 @@ public class TempQueueRegion extends AbstractRegion { return "TempQueueRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%"; } + public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { + + // Force a timeout value so that we don't get an error that + // there is still an active sub. Temp destination may be removed + // while a network sub is still active which is valid. + if( timeout == 0 ) + timeout = 1; + + super.removeDestination(context, destination, timeout); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java index 2823f23653..de1485b7fa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java @@ -20,6 +20,7 @@ package org.apache.activemq.broker.region; import javax.jms.JMSException; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.thread.TaskRunnerFactory; @@ -32,7 +33,8 @@ public class TempTopicRegion extends AbstractRegion { public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); - setAutoCreateDestinations(false); + // We should allow the following to be configurable via a Destination Policy + // setAutoCreateDestinations(false); } protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { @@ -47,5 +49,15 @@ public class TempTopicRegion extends AbstractRegion { return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%"; } + public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { + + // Force a timeout value so that we don't get an error that + // there is still an active sub. Temp destination may be removed + // while a network sub is still active which is valid. + if( timeout == 0 ) + timeout = 1; + + super.removeDestination(context, destination, timeout); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 3c44b68e58..1e46e82ea1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -341,6 +341,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { lastConnectSucceeded.set(true); serviceRemoteBrokerInfo(command); + // Let the local broker know the remote broker's ID. + localBroker.oneway(command); }else if(command.getClass() == ConnectionError.class ) { ConnectionError ce = (ConnectionError) command; diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java index 1d13f716ba..41a2ac905f 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java @@ -535,81 +535,94 @@ public class BrokerTest extends BrokerTestSupport { assertNoMessagesLeft(connection1); } - public void initCombosForTestTempDestinationsRemovedOnConnectionClose() { - addCombinationValues( "deliveryMode", new Object[]{ - new Integer(DeliveryMode.NON_PERSISTENT), - new Integer(DeliveryMode.PERSISTENT)} ); - addCombinationValues( "destinationType", new Object[]{ - new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), - new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} ); - } - public void testTempDestinationsRemovedOnConnectionClose() throws Exception { - - // Setup a first connection - StubConnection connection1 = createConnection(); - ConnectionInfo connectionInfo1 = createConnectionInfo(); - SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); - ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); - connection1.send(connectionInfo1); - connection1.send(sessionInfo1); - connection1.send(producerInfo1); +// +// TODO: need to reimplement this since we don't fail when we send to a non-existant +// destination. But if we can access the Region directly then we should be able to +// check that if the destination was removed. +// +// public void initCombosForTestTempDestinationsRemovedOnConnectionClose() { +// addCombinationValues( "deliveryMode", new Object[]{ +// new Integer(DeliveryMode.NON_PERSISTENT), +// new Integer(DeliveryMode.PERSISTENT)} ); +// addCombinationValues( "destinationType", new Object[]{ +// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), +// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} ); +// } +// +// public void testTempDestinationsRemovedOnConnectionClose() throws Exception { +// +// // Setup a first connection +// StubConnection connection1 = createConnection(); +// ConnectionInfo connectionInfo1 = createConnectionInfo(); +// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); +// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); +// connection1.send(connectionInfo1); +// connection1.send(sessionInfo1); +// connection1.send(producerInfo1); +// +// destination = createDestinationInfo(connection1, connectionInfo1, destinationType); +// +// StubConnection connection2 = createConnection(); +// ConnectionInfo connectionInfo2 = createConnectionInfo(); +// SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); +// ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); +// connection2.send(connectionInfo2); +// connection2.send(sessionInfo2); +// connection2.send(producerInfo2); +// +// // Send from connection2 to connection1's temp destination. Should succeed. +// connection2.send(createMessage(producerInfo2, destination, deliveryMode)); +// +// // Close connection 1 +// connection1.request(closeConnectionInfo(connectionInfo1)); +// +// try { +// // Send from connection2 to connection1's temp destination. Should not succeed. +// connection2.request(createMessage(producerInfo2, destination, deliveryMode)); +// fail("Expected JMSException."); +// } catch ( JMSException success ) { +// } +// +// } - destination = createDestinationInfo(connection1, connectionInfo1, destinationType); - - StubConnection connection2 = createConnection(); - ConnectionInfo connectionInfo2 = createConnectionInfo(); - SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); - ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); - connection2.send(connectionInfo2); - connection2.send(sessionInfo2); - connection2.send(producerInfo2); - - // Send from connection2 to connection1's temp destination. Should succeed. - connection2.send(createMessage(producerInfo2, destination, deliveryMode)); - - // Close connection 1 - connection1.request(closeConnectionInfo(connectionInfo1)); - - try { - // Send from connection2 to connection1's temp destination. Should not succeed. - connection2.request(createMessage(producerInfo2, destination, deliveryMode)); - fail("Expected JMSException."); - } catch ( JMSException success ) { - } - - } - - public void initCombosForTestTempDestinationsAreNotAutoCreated() { - addCombinationValues( "deliveryMode", new Object[]{ - new Integer(DeliveryMode.NON_PERSISTENT), - new Integer(DeliveryMode.PERSISTENT)} ); - addCombinationValues( "destinationType", new Object[]{ - new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), - new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} ); - } - public void testTempDestinationsAreNotAutoCreated() throws Exception { - - // Setup a first connection - StubConnection connection1 = createConnection(); - ConnectionInfo connectionInfo1 = createConnectionInfo(); - SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); - ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); - connection1.send(connectionInfo1); - connection1.send(sessionInfo1); - connection1.send(producerInfo1); - - destination = ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1", destinationType); - - // Should not be able to send to a non-existant temp destination. - try { - connection1.request(createMessage(producerInfo1, destination, deliveryMode)); - fail("Expected JMSException."); - } catch ( JMSException success ) { - } - - } +// public void initCombosForTestTempDestinationsAreNotAutoCreated() { +// addCombinationValues( "deliveryMode", new Object[]{ +// new Integer(DeliveryMode.NON_PERSISTENT), +// new Integer(DeliveryMode.PERSISTENT)} ); +// addCombinationValues( "destinationType", new Object[]{ +// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), +// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} ); +// } +// +// + + +// We create temp destination on demand now so this test case is no longer +// valid. +// +// public void testTempDestinationsAreNotAutoCreated() throws Exception { +// +// // Setup a first connection +// StubConnection connection1 = createConnection(); +// ConnectionInfo connectionInfo1 = createConnectionInfo(); +// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); +// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); +// connection1.send(connectionInfo1); +// connection1.send(sessionInfo1); +// connection1.send(producerInfo1); +// +// destination = ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1", destinationType); +// +// // Should not be able to send to a non-existant temp destination. +// try { +// connection1.request(createMessage(producerInfo1, destination, deliveryMode)); +// fail("Expected JMSException."); +// } catch ( JMSException success ) { +// } +// +// } public void initCombosForTestTempDestinationsOnlyAllowsLocalConsumers() { addCombinationValues( "deliveryMode", new Object[]{