mirror of https://github.com/apache/activemq.git
resolve intermittent failure, wait for both of the conduit subs to register before test initiation. additional accessors to make validation possible
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1213209 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d4ccc50ea7
commit
8d0cf31f84
|
@ -1227,6 +1227,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
return configuration.isDuplex() || createdByDuplex;
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
|
||||
return subscriptionMapByRemoteId;
|
||||
}
|
||||
|
||||
public void setBrokerService(BrokerService brokerService) {
|
||||
this.brokerService = brokerService;
|
||||
this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
|
||||
|
|
|
@ -77,6 +77,9 @@ public class DemandSubscription {
|
|||
return remoteSubsIds.isEmpty();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return remoteSubsIds.size();
|
||||
}
|
||||
/**
|
||||
* @return Returns the localInfo.
|
||||
*/
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.network;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
|
@ -32,6 +33,8 @@ import javax.jms.TopicSession;
|
|||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.activemq.xbean.BrokerFactoryBean;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -105,7 +108,9 @@ public class SimpleNetworkTest extends org.apache.activemq.TestSupport {
|
|||
MessageConsumer consumer2 = remoteSession.createConsumer(included);
|
||||
MessageProducer producer = localSession.createProducer(included);
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
Thread.sleep(2000);
|
||||
|
||||
waitForConsumerRegistration(localBroker, 2);
|
||||
|
||||
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||
Message test = localSession.createTextMessage("test-" + i);
|
||||
producer.send(test);
|
||||
|
@ -117,6 +122,27 @@ public class SimpleNetworkTest extends org.apache.activemq.TestSupport {
|
|||
assertNull(consumer2.receive(1000));
|
||||
}
|
||||
|
||||
private void waitForConsumerRegistration(final BrokerService brokerService, final int min) throws Exception {
|
||||
assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
|
||||
if (bridges.length > 0) {
|
||||
LOG.info(brokerService + " bridges " + bridges);
|
||||
DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
|
||||
ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
|
||||
LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
|
||||
if (!forwardingBridges.isEmpty()) {
|
||||
DemandSubscription demandSubscription = (DemandSubscription) forwardingBridges.values().toArray()[0];
|
||||
LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size());
|
||||
return demandSubscription.size() >= min;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
public void testDurableStoreAndForward() throws Exception {
|
||||
// create a remote durable consumer
|
||||
MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName);
|
||||
|
|
Loading…
Reference in New Issue