From b9045dba98f053e31fb93922978334b126ad146b Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 31 Mar 2011 15:11:09 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations in a network without advisories. Allow the connection id generator prefix to be set via a connection factory such that temp identies can be configured such that they are suitable for inclusion in a network list of statically included destintions. Allow auto recreation of temp destinations by a new producer and tie lifecycle to the producers connection. This allows configurable support for request reply with temps in a network with advisory support disabled git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1087330 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/camel/CamelConnection.java | 5 +- .../camel/CamelConnectionFactory.java | 2 +- .../apache/activemq/ActiveMQConnection.java | 4 +- .../activemq/ActiveMQConnectionFactory.java | 28 +- .../apache/activemq/ActiveMQXAConnection.java | 5 +- .../activemq/ActiveMQXAConnectionFactory.java | 2 +- .../apache/activemq/broker/BrokerService.java | 4 - .../broker/region/AbstractRegion.java | 14 + .../apache/activemq/broker/region/Queue.java | 2 +- .../activemq/broker/region/RegionBroker.java | 2 +- .../activemq/command/ActiveMQTempQueue.java | 1 + .../activemq/command/ActiveMQTempTopic.java | 1 + .../simple/SimpleDiscoveryAgent.java | 5 +- .../JmsMultipleBrokersTestSupport.java | 13 +- .../RequestReplyNoAdvisoryNetworkTest.java | 239 ++++++++++++++++++ 15 files changed, 310 insertions(+), 17 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java b/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java index 16d5c60123..7f39fd0864 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java @@ -30,8 +30,9 @@ public class CamelConnection extends ActiveMQConnection implements CamelContextA private CamelContext camelContext; - protected CamelConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { - super(transport, clientIdGenerator, factoryStats); + protected CamelConnection(Transport transport, IdGenerator clientIdGenerator, + IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { + super(transport, clientIdGenerator, connectionIdGenerator, factoryStats); } public CamelContext getCamelContext() { diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java b/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java index 10e5b40d94..0664504a33 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java @@ -45,7 +45,7 @@ public class CamelConnectionFactory extends ActiveMQConnectionFactory implements // Implementation methods //----------------------------------------------------------------------- protected CamelConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { - CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(), stats); + CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats); CamelContext context = getCamelContext(); if (context != null) { connection.setCamelContext(context); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 101440c2a4..d25962a060 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -200,7 +200,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @param factoryStats * @throws Exception */ - protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { + protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { this.transport = transport; this.clientIdGenerator = clientIdGenerator; @@ -216,7 +216,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } }); // asyncConnectionThread.allowCoreThreadTimeOut(true); - String uniqueId = CONNECTION_ID_GENERATOR.generateId(); + String uniqueId = connectionIdGenerator.generateId(); this.info = new ConnectionInfo(new ConnectionId(uniqueId)); this.info.setManageable(true); this.info.setFaultTolerant(transport.isFaultTolerant()); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 1d8140a22f..54b5e3cf42 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -83,6 +83,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private IdGenerator clientIdGenerator; private String clientIDPrefix; + private IdGenerator connectionIdGenerator; + private String connectionIDPrefix; // client policies private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); @@ -288,7 +290,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne } protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { - ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats); + ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), + getConnectionIdGenerator(), stats); return connection; } @@ -843,6 +846,29 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne this.clientIdGenerator = clientIdGenerator; } + /** + * Sets the prefix used by connection id generator + * @param connectionIDPrefix + */ + public void setConnectionIDPrefix(String connectionIDPrefix) { + this.connectionIDPrefix = connectionIDPrefix; + } + + protected synchronized IdGenerator getConnectionIdGenerator() { + if (connectionIdGenerator == null) { + if (connectionIDPrefix != null) { + connectionIdGenerator = new IdGenerator(connectionIDPrefix); + } else { + connectionIdGenerator = new IdGenerator(); + } + } + return connectionIdGenerator; + } + + protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) { + this.connectionIdGenerator = connectionIdGenerator; + } + /** * @return the statsEnabled */ diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java index e994aa8679..e32af82edb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java @@ -50,8 +50,9 @@ import org.apache.activemq.util.IdGenerator; */ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection { - protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { - super(transport, clientIdGenerator, factoryStats); + protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator, + IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { + super(transport, clientIdGenerator, connectionIdGenerator, factoryStats); } public XASession createXASession() throws JMSException { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java index abec2e823f..b2a715b1c1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java @@ -80,7 +80,7 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple } protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { - ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), stats); + ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats); return connection; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 2af5956214..fb79becbf1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -297,10 +297,6 @@ public class BrokerService implements Service { * @throws Exception */ public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { - if (!isAdvisorySupport()) { - throw new javax.jms.IllegalStateException( - "Networks require advisory messages to function - advisories are currently disabled"); - } NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); return addNetworkConnector(connector); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index c75b70cf63..e681ca07e9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -28,10 +28,12 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.DestinationAlreadyExistsException; import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatchNotification; @@ -130,6 +132,18 @@ public abstract class AbstractRegion implements Region { destinations.put(destination, dest); destinationMap.put(destination, dest); addSubscriptionsForDestination(context, dest); + if (destination.isTemporary()) { + // need to associate with the connection so it can get removed + if (context.getConnection() instanceof TransportConnection) { + TransportConnection transportConnection = (TransportConnection) context.getConnection(); + DestinationInfo info = new DestinationInfo(context.getConnectionId(), + DestinationInfo.ADD_OPERATION_TYPE, + destination); + transportConnection.processAddDestination(info); + LOG.debug("assigning ownership of auto created temp : " + destination + " to connection:" + + context.getConnectionId()); + } + } } if (dest == null) { throw new JMSException("The destination " + destination + " does not exist."); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index f3a39041e0..8a3558f3e1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -834,7 +834,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { }finally { messagesLock.readLock().unlock(); } - return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + return destination.getQualifiedName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups=" + messageGroupOwners; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 0b7e62a1f1..65ba88afdd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -393,7 +393,7 @@ public class RegionBroker extends EmptyBroker { // This seems to cause the destination to be added but without // advisories firing... - context.getBroker().addDestination(context, destination, false); + context.getBroker().addDestination(context, destination, true); switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: queueRegion.addProducer(context, info); diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java index 9708e86553..0f90a4bb08 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java @@ -20,6 +20,7 @@ import javax.jms.JMSException; import javax.jms.TemporaryQueue; /** + * @org.apache.xbean.XBean element="tempQueue" description="An ActiveMQ Temporary Queue Destination" * @openwire:marshaller code="102" * */ diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java index 6a1dd80512..0d1a4931c7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java @@ -20,6 +20,7 @@ import javax.jms.JMSException; import javax.jms.TemporaryTopic; /** + * @org.apache.xbean.XBean element="tempTopic" description="An ActiveMQ Temporary Topic Destination" * @openwire:marshaller code="103" * */ diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java index 586912a4a6..4e8c19688f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java @@ -127,19 +127,21 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { event.connectFailures++; if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { - LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled."); + LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled."); return; } synchronized (sleepMutex) { try { if (!running.get()) { + LOG.debug("Reconnecting disabled: stopped"); return; } LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect."); sleepMutex.wait(event.reconnectDelay); } catch (InterruptedException ie) { + LOG.debug("Reconnecting disabled: " + ie); Thread.currentThread().interrupt(); return; } @@ -161,6 +163,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { } if (!running.get()) { + LOG.debug("Reconnecting disabled: stopped"); return; } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index 516e9054cf..6b82f62051 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -52,11 +52,14 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkBridge; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.Wait; import org.apache.activemq.xbean.BrokerFactoryBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.core.io.Resource; /** @@ -66,6 +69,7 @@ import org.springframework.core.io.Resource; * */ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsMultipleBrokersTestSupport.class); public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0"; public static int maxSetupTime = 5000; @@ -170,7 +174,14 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { if (!broker.getNetworkConnectors().isEmpty()) { result = Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return (broker.getNetworkConnectors().get(bridgeIndex).activeBridges().size() >= min); + int activeCount = 0; + for (NetworkBridge bridge : broker.getNetworkConnectors().get(bridgeIndex).activeBridges()) { + if (bridge.getRemoteBrokerName() != null) { + LOG.info("found bridge to " + bridge.getRemoteBrokerName() + " on broker :" + broker.getBrokerName()); + activeCount++; + } + } + return activeCount >= min; }}, wait); } return result; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java new file mode 100644 index 0000000000..a00467f1d9 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; +import java.net.URLStreamHandlerFactory; +import java.util.Map; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.Wait; +import org.apache.activemq.xbean.XBeanBrokerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(RequestReplyNoAdvisoryNetworkTest.class); + + BrokerService a, b; + ActiveMQQueue sendQ = new ActiveMQQueue("sendQ"); + static final String connectionIdMarker = "ID:marker."; + ActiveMQTempQueue replyQWildcard = new ActiveMQTempQueue(connectionIdMarker + ">"); + private long receiveTimeout = 30000; + + public void testNonAdvisoryNetworkRequestReplyXmlConfig() throws Exception { + final String xmlConfigString = new String( + "" + + " " + + " " + + " " + + " " + + " "+ + " "+ + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + " " + + ""); + final String localProtocolScheme = "inline"; + URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() { + @Override + public URLStreamHandler createURLStreamHandler(String protocol) { + if (localProtocolScheme.equalsIgnoreCase(protocol)) { + return new URLStreamHandler() { + @Override + protected URLConnection openConnection(URL u) throws IOException { + return new URLConnection(u) { + @Override + public void connect() throws IOException { + } + @Override + public InputStream getInputStream() throws IOException { + return new ByteArrayInputStream(xmlConfigString.replace("%HOST%", url.getFile()).getBytes("UTF-8")); + } + }; + } + }; + } + return null; + } + }); + a = new XBeanBrokerFactory().createBroker(new URI("xbean:" + localProtocolScheme + ":A")); + b = new XBeanBrokerFactory().createBroker(new URI("xbean:" + localProtocolScheme + ":B")); + + doTestNonAdvisoryNetworkRequestReply(); + } + + public void testNonAdvisoryNetworkRequestReply() throws Exception { + createBridgeAndStartBrokers(); + doTestNonAdvisoryNetworkRequestReply(); + } + + public void doTestNonAdvisoryNetworkRequestReply() throws Exception { + + waitForBridgeFormation(a, 1, 0); + waitForBridgeFormation(b, 1, 0); + + ActiveMQConnectionFactory sendFactory = createConnectionFactory(a); + ActiveMQConnection sendConnection = createConnection(sendFactory); + + ActiveMQSession sendSession = (ActiveMQSession)sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(sendQ); + ActiveMQTempQueue realReplyQ = (ActiveMQTempQueue) sendSession.createTemporaryQueue(); + TextMessage message = sendSession.createTextMessage("1"); + message.setJMSReplyTo(realReplyQ); + producer.send(message); + + // responder + ActiveMQConnectionFactory consumerFactory = createConnectionFactory(b); + ActiveMQConnection consumerConnection = createConnection(consumerFactory); + + ActiveMQSession consumerSession = (ActiveMQSession)consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(sendQ); + TextMessage received = (TextMessage) consumer.receive(receiveTimeout); + assertNotNull(received); + + LOG.info("got request, sending reply"); + + MessageProducer consumerProducer = consumerSession.createProducer(received.getJMSReplyTo()); + consumerProducer.send(consumerSession.createTextMessage("got " + received.getText())); + // temp dest on reply broker tied to this connection, setOptimizedDispatch=true ensures + // message gets delivered before destination is removed + consumerConnection.close(); + + // reply consumer + MessageConsumer replyConsumer = sendSession.createConsumer(realReplyQ); + TextMessage reply = (TextMessage) replyConsumer.receive(receiveTimeout); + assertNotNull("expected reply message", reply); + assertEquals("text is as expected", "got 1", reply.getText()); + sendConnection.close(); + + verifyAllTempQueuesAreGone(); + } + + private void verifyAllTempQueuesAreGone() throws Exception { + for (BrokerService brokerService : new BrokerService[]{a, b}) { + RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); + Map temps = regionBroker.getTempTopicRegion().getDestinationMap(); + assertTrue("no temp topics on " + brokerService + ", " + temps, temps.isEmpty()); + temps = regionBroker.getTempQueueRegion().getDestinationMap(); + assertTrue("no temp queues on " + brokerService + ", " + temps, temps.isEmpty()); + } + } + + private ActiveMQConnection createConnection(ActiveMQConnectionFactory factory) throws Exception { + ActiveMQConnection c =(ActiveMQConnection) factory.createConnection(); + c.start(); + return c; + } + + private ActiveMQConnectionFactory createConnectionFactory(BrokerService brokerService) throws Exception { + String target = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); + ActiveMQConnectionFactory factory = + new ActiveMQConnectionFactory(target); + factory.setWatchTopicAdvisories(false); + factory.setConnectionIDPrefix(connectionIdMarker + brokerService.getBrokerName()); + return factory; + } + + public void createBridgeAndStartBrokers() throws Exception { + a = configureBroker("A"); + b = configureBroker("B"); + bridge(a, b); + bridge(b, a); + a.start(); + b.start(); + } + + public void tearDown() throws Exception { + stop(a); + stop(b); + } + + private void stop(BrokerService broker) throws Exception { + if (broker != null) { + broker.stop(); + } + } + + private void bridge(BrokerService from, BrokerService to) throws Exception { + TransportConnector toConnector = to.addConnector("tcp://localhost:0"); + NetworkConnector bridge = + from.addNetworkConnector("static://" + toConnector.getPublishableConnectString()); + bridge.addStaticallyIncludedDestination(sendQ); + bridge.addStaticallyIncludedDestination(replyQWildcard); + } + + private BrokerService configureBroker(String brokerName) throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(brokerName); + broker.setAdvisorySupport(false); + broker.setPersistent(false); + broker.setUseJmx(false); + + PolicyMap map = new PolicyMap(); + PolicyEntry tempReplyQPolicy = new PolicyEntry(); + tempReplyQPolicy.setOptimizedDispatch(true); + map.put(replyQWildcard, tempReplyQPolicy); + broker.setDestinationPolicy(map); + return broker; + } +}