fix for DuplexNetworkTest failure

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1214700 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-12-15 11:10:07 +00:00
parent 802f6b193a
commit ecb10e9519
2 changed files with 22 additions and 26 deletions

View File

@ -16,17 +16,16 @@
*/ */
package org.apache.activemq.command; package org.apache.activemq.command;
import java.io.IOException;
import java.util.Arrays;
import javax.jms.JMSException;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.JMSExceptionSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.Arrays;
/** /**
* @openwire:marshaller code="91" * @openwire:marshaller code="91"
* *
@ -106,6 +105,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
LOG.trace("ConsumerInfo advisory all ready routed once through target broker (" LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
+ networkBrokerId + "), path: " + networkBrokerId + "), path: "
+ Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message); + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
return false;
} }
} }
return true; return true;

View File

@ -16,22 +16,9 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
@ -42,6 +29,11 @@ import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import javax.jms.*;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
public class SimpleNetworkTest extends org.apache.activemq.TestSupport { public class SimpleNetworkTest extends org.apache.activemq.TestSupport {
protected static final int MESSAGE_COUNT = 10; protected static final int MESSAGE_COUNT = 10;
@ -89,7 +81,6 @@ public class SimpleNetworkTest extends org.apache.activemq.TestSupport {
} }
public void testFiltering() throws Exception { public void testFiltering() throws Exception {
MessageConsumer includedConsumer = remoteSession.createConsumer(included); MessageConsumer includedConsumer = remoteSession.createConsumer(included);
MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded); MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded);
MessageProducer includedProducer = localSession.createProducer(included); MessageProducer includedProducer = localSession.createProducer(included);
@ -109,7 +100,7 @@ public class SimpleNetworkTest extends org.apache.activemq.TestSupport {
MessageProducer producer = localSession.createProducer(included); MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 2); waitForConsumerRegistration(localBroker, 2, included);
for (int i = 0; i < MESSAGE_COUNT; i++) { for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i); Message test = localSession.createTextMessage("test-" + i);
@ -122,20 +113,23 @@ public class SimpleNetworkTest extends org.apache.activemq.TestSupport {
assertNull(consumer2.receive(1000)); assertNull(consumer2.receive(1000));
} }
private void waitForConsumerRegistration(final BrokerService brokerService, final int min) throws Exception { private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() { assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray(); Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
if (bridges.length > 0) { if (bridges.length > 0) {
LOG.info(brokerService + " bridges " + bridges); LOG.info(brokerService + " bridges " + Arrays.toString(bridges));
DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0]; DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap(); ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges); LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
if (!forwardingBridges.isEmpty()) { if (!forwardingBridges.isEmpty()) {
DemandSubscription demandSubscription = (DemandSubscription) forwardingBridges.values().toArray()[0]; for (DemandSubscription demandSubscription : forwardingBridges.values()) {
LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size()); if (demandSubscription.getLocalInfo().getDestination().equals(destination)) {
return demandSubscription.size() >= min; LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size());
return demandSubscription.size() >= min;
}
}
} }
} }
return false; return false;
@ -220,8 +214,10 @@ public class SimpleNetworkTest extends org.apache.activemq.TestSupport {
protected void doSetUp() throws Exception { protected void doSetUp() throws Exception {
remoteBroker = createRemoteBroker(); remoteBroker = createRemoteBroker();
remoteBroker.start(); remoteBroker.start();
remoteBroker.waitUntilStarted();
localBroker = createLocalBroker(); localBroker = createLocalBroker();
localBroker.start(); localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI(); URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true); fac.setAlwaysSyncSend(true);