Modifing the AdivsoryBroker to set the originBrokerURL to the transport connector's
URL if it has been set versus using a default URL.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-06-17 17:47:24 +00:00
parent 428fc82c8c
commit fc3e026122
2 changed files with 29 additions and 1 deletions

View File

@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.DurableTopicSubscription;
@ -627,7 +628,12 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String url = getBrokerService().getVmConnectorURI().toString(); String url = getBrokerService().getVmConnectorURI().toString();
if (getBrokerService().getDefaultSocketURIString() != null) { //try and find the URL on the transport connector and use if it exists else
//try and find a default URL
if (context.getConnector() instanceof TransportConnector
&& ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) {
url = ((TransportConnector) context.getConnector()).getPublishableConnectString();
} else if (getBrokerService().getDefaultSocketURIString() != null) {
url = getBrokerService().getDefaultSocketURIString(); url = getBrokerService().getDefaultSocketURIString();
} }
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.advisory;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -147,6 +148,11 @@ public class AdvisoryTests {
assertNotNull(msg); assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should always be tcp:// because that is the transport that is used to connect even though
//the nio transport is the first one in the list
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
//Add assertion to make sure body is included for advisory topics //Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true //when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload); assertIncludeBodyForAdvisory(payload);
@ -177,6 +183,11 @@ public class AdvisoryTests {
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
String originalId = payload.getJMSMessageID(); String originalId = payload.getJMSMessageID();
assertEquals(originalId, id); assertEquals(originalId, id);
//This should always be tcp:// because that is the transport that is used to connect even though
//the nio transport is the first one in the list
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
//Add assertion to make sure body is included for advisory topics //Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true //when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload); assertIncludeBodyForAdvisory(payload);
@ -204,6 +215,10 @@ public class AdvisoryTests {
assertNotNull(msg); assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should be set
assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
//Add assertion to make sure body is included for advisory topics //Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true //when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload); assertIncludeBodyForAdvisory(payload);
@ -235,6 +250,8 @@ public class AdvisoryTests {
assertNotNull(msg); assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should be set
assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
//Add assertion to make sure body is included for DLQ advisory topics //Add assertion to make sure body is included for DLQ advisory topics
//when includeBodyForAdvisory is true //when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload); assertIncludeBodyForAdvisory(payload);
@ -265,6 +282,10 @@ public class AdvisoryTests {
assertNotNull(msg); assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should be set
assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
//Add assertion to make sure body is included for advisory topics //Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true //when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload); assertIncludeBodyForAdvisory(payload);
@ -319,6 +340,7 @@ public class AdvisoryTests {
pMap.setDefaultEntry(policy); pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap); answer.setDestinationPolicy(pMap);
answer.addConnector("nio://localhost:0");
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
} }