mirror of https://github.com/apache/activemq.git
- Send the ConnectionInfo first before sending the BrokerInfo in the DemandForwardingBridge, so it would work in http.
- Enabled the TwoBrokerTopicSendReceiveUsingHttpTest. - Added a sleep to the test to give jetty enough time to setup before sending messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386738 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8b3fb9b928
commit
9920d70580
|
@ -173,8 +173,10 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
|
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
|
||||||
localConnectionInfo.setClientId(localClientId);
|
localConnectionInfo.setClientId(localClientId);
|
||||||
localBroker.oneway(localConnectionInfo);
|
localBroker.oneway(localConnectionInfo);
|
||||||
|
|
||||||
localSessionInfo=new SessionInfo(localConnectionInfo,1);
|
localSessionInfo=new SessionInfo(localConnectionInfo,1);
|
||||||
localBroker.oneway(localSessionInfo);
|
localBroker.oneway(localSessionInfo);
|
||||||
|
|
||||||
log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
|
log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
|
||||||
+") has been established.");
|
+") has been established.");
|
||||||
startedLatch.countDown();
|
startedLatch.countDown();
|
||||||
|
@ -184,18 +186,23 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
|
|
||||||
protected void startRemoteBridge() throws IOException {
|
protected void startRemoteBridge() throws IOException {
|
||||||
if(remoteBridgeStarted.compareAndSet(false,true)){
|
if(remoteBridgeStarted.compareAndSet(false,true)){
|
||||||
BrokerInfo brokerInfo=new BrokerInfo();
|
|
||||||
brokerInfo.setBrokerName(localBrokerName);
|
|
||||||
remoteBroker.oneway(brokerInfo);
|
|
||||||
remoteConnectionInfo=new ConnectionInfo();
|
remoteConnectionInfo=new ConnectionInfo();
|
||||||
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
|
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
|
||||||
remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
|
remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
|
||||||
remoteBroker.oneway(remoteConnectionInfo);
|
remoteBroker.oneway(remoteConnectionInfo);
|
||||||
|
|
||||||
|
BrokerInfo brokerInfo=new BrokerInfo();
|
||||||
|
brokerInfo.setBrokerName(localBrokerName);
|
||||||
|
remoteBroker.oneway(brokerInfo);
|
||||||
|
|
||||||
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
|
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
|
||||||
remoteBroker.oneway(remoteSessionInfo);
|
remoteBroker.oneway(remoteSessionInfo);
|
||||||
|
|
||||||
producerInfo=new ProducerInfo(remoteSessionInfo,1);
|
producerInfo=new ProducerInfo(remoteSessionInfo,1);
|
||||||
producerInfo.setResponseRequired(false);
|
producerInfo.setResponseRequired(false);
|
||||||
remoteBroker.oneway(producerInfo);
|
remoteBroker.oneway(producerInfo);
|
||||||
|
|
||||||
// Listen to consumer advisory messages on the remote broker to determine demand.
|
// Listen to consumer advisory messages on the remote broker to determine demand.
|
||||||
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
|
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
|
||||||
demandConsumerInfo.setDispatchAsync(dispatchAsync);
|
demandConsumerInfo.setDispatchAsync(dispatchAsync);
|
||||||
|
@ -203,6 +210,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
+destinationFilter));
|
+destinationFilter));
|
||||||
demandConsumerInfo.setPrefetchSize(prefetchSize);
|
demandConsumerInfo.setPrefetchSize(prefetchSize);
|
||||||
remoteBroker.oneway(demandConsumerInfo);
|
remoteBroker.oneway(demandConsumerInfo);
|
||||||
|
|
||||||
startedLatch.countDown();
|
startedLatch.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -323,8 +323,6 @@
|
||||||
<excludes>
|
<excludes>
|
||||||
<!-- http://jira.activemq.org/jira/browse/AMQ-586: fails on Windows 2003 -->
|
<!-- http://jira.activemq.org/jira/browse/AMQ-586: fails on Windows 2003 -->
|
||||||
<exclude>**/TwoBrokerTopicSendReceiveTest.*</exclude>
|
<exclude>**/TwoBrokerTopicSendReceiveTest.*</exclude>
|
||||||
<!-- http://jira.activemq.org/jira/browse/AMQ-563: HTTP not yet implemented -->
|
|
||||||
<exclude>**/TwoBrokerTopicSendReceiveUsingHttpTest.*</exclude>
|
|
||||||
<!-- http://jira.activemq.org/jira/browse/AMQ-537 -->
|
<!-- http://jira.activemq.org/jira/browse/AMQ-537 -->
|
||||||
<exclude>**/PublishOnQueueConsumedMessageUsingActivemqXMLTest.*</exclude>
|
<exclude>**/PublishOnQueueConsumedMessageUsingActivemqXMLTest.*</exclude>
|
||||||
<!-- http://jira.activemq.org/jira/browse/AMQ-538 -->
|
<!-- http://jira.activemq.org/jira/browse/AMQ-538 -->
|
||||||
|
|
|
@ -25,6 +25,14 @@ import javax.jms.JMSException;
|
||||||
*/
|
*/
|
||||||
public class TwoBrokerTopicSendReceiveUsingHttpTest extends TwoBrokerTopicSendReceiveTest {
|
public class TwoBrokerTopicSendReceiveUsingHttpTest extends TwoBrokerTopicSendReceiveTest {
|
||||||
|
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
// Give jetty server enough time to setup,
|
||||||
|
// so we don't lose messages when connection fails
|
||||||
|
Thread.sleep(5000);
|
||||||
|
}
|
||||||
|
|
||||||
protected ActiveMQConnectionFactory createReceiverConnectionFactory() throws JMSException {
|
protected ActiveMQConnectionFactory createReceiverConnectionFactory() throws JMSException {
|
||||||
return createConnectionFactory("org/apache/activemq/usecases/receiver-http.xml", "receiver", "vm://receiver");
|
return createConnectionFactory("org/apache/activemq/usecases/receiver-http.xml", "receiver", "vm://receiver");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue