diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempDestDemandSubscriptionCleanupTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempDestDemandSubscriptionCleanupTest.java new file mode 100644 index 0000000000..556da8192a --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempDestDemandSubscriptionCleanupTest.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.region.AbstractRegion; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * @author Christian Posta + */ +public class ThreeBrokerTempDestDemandSubscriptionCleanupTest extends JmsMultipleBrokersTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ThreeBrokerTempDestDemandSubscriptionCleanupTest.class); + + boolean enableTempDestinationBridging = true; + + private static final String BROKER_A = "BrokerA"; + private static final String BROKER_B = "BrokerB"; + private static final String BROKER_C = "BrokerC"; + + private static final String ECHO_QUEUE_NAME = "echo"; + private static final int NUM_ITER = 100; + private static final long CONSUME_TIMEOUT = 500; + + + /** + * So we network three brokers together, and send a message with request-reply semantics. + * The message goes to an echo service listening on broker C. We send a message on a queue + * to broker A which gets demand forwarded to broker C. the echo service will respond to the + * temp destination listed in the JMSReplyTo header. that will get demand forwarded back to + * broker A. When the consumer of the temp dest on broker A closes, that subscription should + * be removed on broker A. advisories firing from broker A to broker B should remove that + * subscription on broker B. advisories firing from broker B to broker C should remove that + * subscription on broker C. + * + * @throws Exception + */ + public void testSubscriptionsCleanedUpRace() throws Exception { + + final BrokerItem brokerA = brokers.get(BROKER_A); + + + Runnable tester = new Runnable() { + + @Override + public void run() { + for (int i = 0; i < NUM_ITER; i++) { + + Connection conn = null; + try { + conn = brokerA.createConnection(); + + conn.start(); + + final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = sess.createQueue(ECHO_QUEUE_NAME); + + MessageProducer producer = sess.createProducer(destination); + + LOG.info("Starting iter: " + i); + Destination replyTo = sess.createTemporaryQueue(); + MessageConsumer responseConsumer = sess.createConsumer(replyTo); + + Message message = sess.createTextMessage("Iteration: " + i); + message.setJMSReplyTo(replyTo); + + producer.send(message); + + TextMessage response = (TextMessage)responseConsumer.receive(CONSUME_TIMEOUT); + assertNotNull("We should have gotten a response, but didn't for iter: " + i, response); + assertEquals("We got the wrong response from the echo service", "Iteration: " + i, response.getText()); + + + // so we close the consumer so that an actual RemoveInfo command gets propogated through the + // network + responseConsumer.close(); + conn.close(); + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + + } + } + }; + + ExecutorService threadService = Executors.newFixedThreadPool(2); + threadService.submit(tester); + threadService.submit(tester); + + threadService.shutdown(); + assertTrue("executor done on time", threadService.awaitTermination(10l, TimeUnit.SECONDS)); + + // for the real test... we should not have any subscriptions left on broker C for the temp dests + BrokerItem brokerC = brokers.get(BROKER_C); + RegionBroker regionBroker = (RegionBroker) brokerC.broker.getRegionBroker(); + final AbstractRegion region = (AbstractRegion) regionBroker.getTempQueueRegion(); + + assertTrue("There were no lingering temp-queue destinations", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("Lingering temps: " + region.getSubscriptions().size()); + return 0 == region.getSubscriptions().size(); + } + })); + } + + + /** + * This test is slightly different from the above. We don't explicitly close the consumer down + * (which we did in the previous test to force the RemoveInfo to be sent). Here we just close + * the connection which should still clean up the subscriptions and temp destinations on the + * networked brokers. + * + * @throws Exception + */ + public void testSubscriptionsCleanedUpAfterConnectionClose() throws Exception { + + final BrokerItem brokerA = brokers.get(BROKER_A); + + for (int i = 0; i < NUM_ITER; i++) { + + Connection conn = null; + try { + conn = brokerA.createConnection(); + + conn.start(); + + final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = sess.createQueue(ECHO_QUEUE_NAME); + + MessageProducer producer = sess.createProducer(destination); + + LOG.info("Starting iter: " + i); + Destination replyTo = sess.createTemporaryQueue(); + MessageConsumer responseConsumer = sess.createConsumer(replyTo); + + Message message = sess.createTextMessage("Iteration: " + i); + message.setJMSReplyTo(replyTo); + + producer.send(message); + + TextMessage response = (TextMessage)responseConsumer.receive(CONSUME_TIMEOUT); + assertNotNull("We should have gotten a response, but didn't for iter: " + i, response); + assertEquals("We got the wrong response from the echo service", "Iteration: " + i, response.getText()); + + + // so closing the connection without closing the consumer first will leak subscriptions + // in a nob? +// responseConsumer.close(); + conn.close(); + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + + } + + // for the real test... we should not have any subscriptions left on broker C for the temp dests + BrokerItem brokerC = brokers.get(BROKER_C); + RegionBroker regionBroker = (RegionBroker) brokerC.broker.getRegionBroker(); + final AbstractRegion region = (AbstractRegion) regionBroker.getTempQueueRegion(); + + assertTrue("There were no lingering temp-queue destinations", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("Lingering temps: " + region.getSubscriptions().size()); + return 0 == region.getSubscriptions().size(); + } + })); + + } + + private void installEchoClientOnBrokerC() throws Exception { + BrokerItem brokerC = brokers.get(BROKER_C); + Connection conn = brokerC.createConnection(); + conn.start(); + + final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = sess.createQueue(ECHO_QUEUE_NAME); + MessageConsumer consumer = sess.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + + TextMessage textMessage = (TextMessage) message; + + try { + Destination replyTo = message.getJMSReplyTo(); + + MessageProducer producer = sess.createProducer(replyTo); + Message response = sess.createTextMessage(textMessage.getText()); + + LOG.info("Replying to this request: " + textMessage.getText()); + producer.send(response); + producer.close(); + + } catch (JMSException e) { + e.printStackTrace(); + fail("Could not respond to an echo request"); + } + } + }); + } + + + @Override + protected void setUp() throws Exception { + super.setUp(); + createBroker(new URI("broker:(tcp://localhost:61616)/" + BROKER_A + "?persistent=false&useJmx=false")); + createBroker(new URI("broker:(tcp://localhost:61617)/" + BROKER_B + "?persistent=false&useJmx=false")); + createBroker(new URI("broker:(tcp://localhost:61618)/" + BROKER_C + "?persistent=false&useJmx=false")); + + bridgeBrokers("BrokerA", "BrokerB", false, 3); + bridgeBrokers("BrokerB", "BrokerC", false, 3); + + startAllBrokers(); + + // set up a listener on broker C that will demand forward subscriptions over the network + installEchoClientOnBrokerC(); + } + + protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception { + NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName, dynamicOnly, networkTTL, true); + connector.setBridgeTempDestinations(enableTempDestinationBridging); + connector.setDuplex(true); + return connector; + } +} \ No newline at end of file