diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 0abe6889b6..e7d103e2ff 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1227,6 +1227,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return configuration.isDuplex() || createdByDuplex; } + public ConcurrentHashMap getLocalSubscriptionMap() { + return subscriptionMapByRemoteId; + } + public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java index 1fe36a0c94..da03e4f83b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -77,6 +77,9 @@ public class DemandSubscription { return remoteSubsIds.isEmpty(); } + public int size() { + return remoteSubsIds.size(); + } /** * @return Returns the localInfo. */ diff --git a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index 96510c0060..9e1c3b3b72 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.network; import java.net.URI; +import java.util.concurrent.ConcurrentHashMap; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -32,6 +33,8 @@ import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.util.Wait; import org.apache.activemq.xbean.BrokerFactoryBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +108,9 @@ public class SimpleNetworkTest extends org.apache.activemq.TestSupport { MessageConsumer consumer2 = remoteSession.createConsumer(included); MessageProducer producer = localSession.createProducer(included); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - Thread.sleep(2000); + + waitForConsumerRegistration(localBroker, 2); + for (int i = 0; i < MESSAGE_COUNT; i++) { Message test = localSession.createTextMessage("test-" + i); producer.send(test); @@ -117,6 +122,27 @@ public class SimpleNetworkTest extends org.apache.activemq.TestSupport { assertNull(consumer2.receive(1000)); } + private void waitForConsumerRegistration(final BrokerService brokerService, final int min) throws Exception { + assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray(); + if (bridges.length > 0) { + LOG.info(brokerService + " bridges " + bridges); + DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0]; + ConcurrentHashMap forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap(); + LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges); + if (!forwardingBridges.isEmpty()) { + DemandSubscription demandSubscription = (DemandSubscription) forwardingBridges.values().toArray()[0]; + LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size()); + return demandSubscription.size() >= min; + } + } + return false; + } + })); + } + public void testDurableStoreAndForward() throws Exception { // create a remote durable consumer MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName);