Wait for bridge formation before attempting to receive otherwise the

timed receive might not work.
This commit is contained in:
Timothy Bish 2016-06-10 11:22:45 -04:00
parent 534dbee447
commit cf750d5cee
1 changed files with 55 additions and 2 deletions

View File

@ -33,9 +33,11 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.After; import org.junit.After;
@ -114,6 +116,14 @@ public class TopicOutboundBridgeReconnectTest {
MessageConsumer consumer = createConsumer(); MessageConsumer consumer = createConsumer();
assertTrue("Should have a bridge to the topic", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToProducerBroker().getTopicSubscribers().length > 0;
}
}));
sendMessage("test123"); sendMessage("test123");
Message message = consumer.receive(5000); Message message = consumer.receive(5000);
assertNotNull(message); assertNotNull(message);
@ -124,6 +134,14 @@ public class TopicOutboundBridgeReconnectTest {
stopProducerBroker(); stopProducerBroker();
startProducerBroker(); startProducerBroker();
assertTrue("Should have a bridge to the topic", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToProducerBroker().getTopicSubscribers().length > 0;
}
}));
sendMessage("test123"); sendMessage("test123");
message = consumer.receive(5000); message = consumer.receive(5000);
assertNotNull(message); assertNotNull(message);
@ -201,6 +219,15 @@ public class TopicOutboundBridgeReconnectTest {
startConsumerBroker(); startConsumerBroker();
MessageConsumer consumer = createConsumer(); MessageConsumer consumer = createConsumer();
final BrokerViewMBean producerBrokerView = getProxyToProducerBroker();
assertTrue("Should have a bridge to the topic", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return producerBrokerView.getTopicSubscribers().length > 0;
}
}));
sendMessage("test123"); sendMessage("test123");
Message message = consumer.receive(2000); Message message = consumer.receive(2000);
assertNotNull(message); assertNotNull(message);
@ -266,11 +293,36 @@ public class TopicOutboundBridgeReconnectTest {
} }
} }
protected BrokerViewMBean getProxyToProducerBroker() throws Exception {
if (producerBroker == null) {
throw new IllegalStateException("Producer broker is not running.");
}
ObjectName brokerViewMBean = new ObjectName(
"org.apache.activemq:type=Broker,brokerName=" + producerBroker.getBrokerName());
BrokerViewMBean proxy = (BrokerViewMBean) producerBroker.getManagementContext()
.newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
return proxy;
}
protected BrokerViewMBean getProxyToConsumerBroker() throws Exception {
if (producerBroker == null) {
throw new IllegalStateException("Consumer broker is not running.");
}
ObjectName brokerViewMBean = new ObjectName(
"org.apache.activemq:type=Broker,brokerName=" + consumerBroker.getBrokerName());
BrokerViewMBean proxy = (BrokerViewMBean) consumerBroker.getManagementContext()
.newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
return proxy;
}
protected BrokerService createFirstBroker() throws Exception { protected BrokerService createFirstBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setBrokerName("broker1"); broker.setBrokerName("broker1");
broker.setPersistent(false); broker.setPersistent(false);
broker.setUseJmx(false); broker.setUseJmx(true);
broker.getManagementContext().setCreateConnector(false);
broker.addConnector("tcp://localhost:61616"); broker.addConnector("tcp://localhost:61616");
broker.addConnector("vm://broker1"); broker.addConnector("vm://broker1");
@ -289,7 +341,8 @@ public class TopicOutboundBridgeReconnectTest {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setBrokerName("broker2"); broker.setBrokerName("broker2");
broker.setPersistent(false); broker.setPersistent(false);
broker.setUseJmx(false); broker.setUseJmx(true);
broker.getManagementContext().setCreateConnector(false);
broker.addConnector("tcp://localhost:61617"); broker.addConnector("tcp://localhost:61617");
broker.addConnector("vm://broker2"); broker.addConnector("vm://broker2");