diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index 8e8a8bf66b..cdcd0c8054 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -216,6 +216,18 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { return null; } + protected MessageConsumer createSyncConsumer(String brokerName, Destination dest) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + Connection con = brokerItem.createConnection(); + con.start(); + Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = sess.createConsumer(dest); + return consumer; + } + return null; + } + protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception { return createConsumer(brokerName, dest, null, null); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java index 4e2d2ef928..e36ff0c9ed 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java @@ -17,14 +17,19 @@ package org.apache.activemq.usecases; import java.net.URI; +import java.util.Arrays; import java.util.Enumeration; import javax.jms.Destination; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.QueueBrowser; import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.broker.region.QueueSubscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.MessageIdList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,7 +71,7 @@ public class BrowseOverNetworkTest extends JmsMultipleBrokersTestSupport { + msgsB.getMessageCount()); } - public void testconsumerInfo() throws Exception { + public void testConsumerInfo() throws Exception { createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker1.xml")); createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker2.xml")); @@ -74,7 +79,7 @@ public class BrowseOverNetworkTest extends JmsMultipleBrokersTestSupport { brokers.get("broker1").broker.waitUntilStarted(); - + Destination dest = createDestination("QUEUE.A,QUEUE.B", false); @@ -86,17 +91,138 @@ public class BrowseOverNetworkTest extends JmsMultipleBrokersTestSupport { } - protected int browseMessages(String broker, Destination dest) throws Exception { - QueueBrowser browser = createBrowser(broker, dest); + public class Browser extends Thread { + + String broker; + Destination dest; + int totalCount; + QueueBrowser browser = null; + MessageConsumer consumer = null; + boolean consume = false; + + public Browser(String broker, Destination dest) { + this.broker = broker; + this.dest = dest; + } + + public void run() { + int retries = 0; + while (retries++ < 5) { + try { + QueueBrowser browser = createBrowser(broker, dest); + int count = browseMessages(browser, broker); + LOG.info("browser '" + broker + "' browsed " + totalCount); + if (consume) { + if (count != 0) { + MessageConsumer consumer = createSyncConsumer(broker, dest); + totalCount += count; + for (int i = 0; i < count; i++) { + ActiveMQTextMessage message = (ActiveMQTextMessage)consumer.receive(1000); + LOG.info(broker + " consumer: " + message.getText() + " " + message.getDestination() + " " + message.getMessageId() + " " + Arrays.toString(message.getBrokerPath())); + if (message == null) break; + } + } + } else { + totalCount = count; + } + + Thread.sleep(1000); + } catch (Exception e) { + LOG.info("Exception browsing " + e, e); + } finally { + try { + if (browser != null) { + browser.close(); + } + if (consumer != null) { + consumer.close(); + } + } catch (Exception e) { + LOG.info("Exception closing browser " + e, e); + } + } + } + } + + public int getTotalCount() { + return totalCount; + } + } + + protected NetworkConnector bridgeBrokersWithIncludedDestination(String localBrokerName, String remoteBrokerName, ActiveMQDestination included, ActiveMQDestination excluded) throws Exception { + NetworkConnector nc = bridgeBrokers(localBrokerName, remoteBrokerName, false, 4, true); + nc.addStaticallyIncludedDestination(included); + if (excluded != null) { + nc.addExcludedDestination(excluded); + } + nc.setPrefetchSize(1); + return nc; + } + + + public void testMultipleBrowsers() throws Exception { + createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false&brokerId=BrokerA")); + createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false&brokerId=BrokerB")); + createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false&brokerId=BrokerC")); + createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD?persistent=false&useJmx=false&brokerId=BrokerD")); + + Destination composite = createDestination("TEST.FOO,TEST.BAR", false); + Destination dest1 = createDestination("TEST.FOO", false); + Destination dest2 = createDestination("TEST.BAR", false); + + bridgeBrokersWithIncludedDestination("BrokerA", "BrokerC", (ActiveMQDestination)composite, null); + bridgeBrokersWithIncludedDestination("BrokerA", "BrokerB", (ActiveMQDestination)composite, null); + bridgeBrokersWithIncludedDestination("BrokerA", "BrokerD", (ActiveMQDestination)composite, null); + bridgeBrokersWithIncludedDestination("BrokerB", "BrokerA", (ActiveMQDestination)composite, null); + bridgeBrokersWithIncludedDestination("BrokerB", "BrokerC", (ActiveMQDestination)composite, null); + bridgeBrokersWithIncludedDestination("BrokerB", "BrokerD", (ActiveMQDestination)composite, null); + bridgeBrokersWithIncludedDestination("BrokerC", "BrokerA", (ActiveMQDestination)dest2, (ActiveMQDestination)dest1); + bridgeBrokersWithIncludedDestination("BrokerC", "BrokerB", (ActiveMQDestination)dest2, (ActiveMQDestination)dest1); + bridgeBrokersWithIncludedDestination("BrokerC", "BrokerD", (ActiveMQDestination)dest2, (ActiveMQDestination)dest1); + bridgeBrokersWithIncludedDestination("BrokerD", "BrokerA", (ActiveMQDestination)dest1, (ActiveMQDestination)dest2); + bridgeBrokersWithIncludedDestination("BrokerD", "BrokerB", (ActiveMQDestination)dest1, (ActiveMQDestination)dest2); + bridgeBrokersWithIncludedDestination("BrokerD", "BrokerC", (ActiveMQDestination)dest1, (ActiveMQDestination)dest2); + + startAllBrokers(); + + brokers.get("BrokerA").broker.waitUntilStarted(); + brokers.get("BrokerC").broker.waitUntilStarted(); + brokers.get("BrokerD").broker.waitUntilStarted(); + + Browser browser1 = new Browser("BrokerC", composite); + browser1.start(); + + Browser browser2 = new Browser("BrokerD", composite); + browser2.start(); + + sendMessages("BrokerA", composite, MESSAGE_COUNT); + + browser1.join(); + browser2.join(); + + assertEquals(MESSAGE_COUNT * 2, browser1.getTotalCount() + browser2.getTotalCount() ); + + } + + protected int browseMessages(QueueBrowser browser, String name) throws Exception { Enumeration msgs = browser.getEnumeration(); int browsedMessage = 0; while (msgs.hasMoreElements()) { browsedMessage++; - msgs.nextElement(); + ActiveMQTextMessage message = (ActiveMQTextMessage)msgs.nextElement(); + LOG.info(name + " browsed: " + message.getText() + " " + message.getDestination() + " " + message.getMessageId() + " " + Arrays.toString(message.getBrokerPath())); } return browsedMessage; } + + protected int browseMessages(String broker, Destination dest) throws Exception { + QueueBrowser browser = createBrowser(broker, dest); + int browsedMessage = browseMessages(browser, "browser"); + browser.close(); + return browsedMessage; + } + public void setUp() throws Exception { super.setAutoFail(true); super.setUp(); diff --git a/activemq-core/src/test/resources/log4j.properties b/activemq-core/src/test/resources/log4j.properties index d2d1242094..e21fa9f84e 100755 --- a/activemq-core/src/test/resources/log4j.properties +++ b/activemq-core/src/test/resources/log4j.properties @@ -21,6 +21,7 @@ log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.broker.scheduler=DEBUG +#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG #log4j.logger.org.apache.activemq=TRACE #log4j.logger.org.apache.activemq.store.jdbc=TRACE #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG diff --git a/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml b/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml index 3807567e90..8a27d497dd 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml @@ -25,7 +25,7 @@ + persistent="true" start="false" advisorySupport="true" deleteAllMessagesOnStartup="true"> diff --git a/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml b/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml index 3d6ba7f92d..0b34f0e4e6 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml @@ -26,7 +26,7 @@ + persistent="true" start="false" advisorySupport="true" deleteAllMessagesOnStartup="true">