AMQ-7352 - Add support for anonymous producer advisories

By default this behavior is turned off but can be enabled by setting
anonymousProducerAdvisorySupport on the BrokerService to true
This commit is contained in:
Christopher L. Shannon (cshannon) 2019-11-22 13:35:32 -05:00
parent a7bf4fc804
commit 0c6f9a9a1e
4 changed files with 122 additions and 8 deletions

View File

@ -229,8 +229,10 @@ public class AdvisoryBroker extends BrokerFilter {
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.addProducer(context, info); super.addProducer(context, info);
// Don't advise advisory topics. //Verify destination is either non-null or that we want to advise anonymous producers on null destination
if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { //Don't advise advisory topics.
if ((info.getDestination() != null || getBrokerService().isAnonymousProducerAdvisorySupport())
&& !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
fireProducerAdvisory(context, info.getDestination(), topic, info); fireProducerAdvisory(context, info.getDestination(), topic, info);
producers.put(info.getProducerId(), info); producers.put(info.getProducerId(), info);
@ -412,12 +414,13 @@ public class AdvisoryBroker extends BrokerFilter {
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info); super.removeProducer(context, info);
// Don't advise advisory topics. //Verify destination is either non-null or that we want to advise anonymous producers on null destination
//Don't advise advisory topics.
ActiveMQDestination dest = info.getDestination(); ActiveMQDestination dest = info.getDestination();
if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { if ((dest != null || getBrokerService().isAnonymousProducerAdvisorySupport()) && !AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
producers.remove(info.getProducerId()); producers.remove(info.getProducerId());
if (!dest.isTemporary() || destinations.containsKey(dest)) { if (dest == null || !dest.isTemporary() || destinations.containsKey(dest)) {
fireProducerAdvisory(context, dest, topic, info.createRemoveCommand()); fireProducerAdvisory(context, dest, topic, info.createRemoveCommand());
} }
} }

View File

@ -192,6 +192,7 @@ public class BrokerService implements Service {
// to other jms messaging systems // to other jms messaging systems
private boolean deleteAllMessagesOnStartup; private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true; private boolean advisorySupport = true;
private boolean anonymousProducerAdvisorySupport = false;
private URI vmConnectorURI; private URI vmConnectorURI;
private String defaultSocketURIString; private String defaultSocketURIString;
private PolicyMap destinationPolicy; private PolicyMap destinationPolicy;
@ -1522,6 +1523,14 @@ public class BrokerService implements Service {
this.advisorySupport = advisorySupport; this.advisorySupport = advisorySupport;
} }
public boolean isAnonymousProducerAdvisorySupport() {
return anonymousProducerAdvisorySupport;
}
public void setAnonymousProducerAdvisorySupport(boolean anonymousProducerAdvisorySupport) {
this.anonymousProducerAdvisorySupport = anonymousProducerAdvisorySupport;
}
public List<TransportConnector> getTransportConnectors() { public List<TransportConnector> getTransportConnectors() {
return new ArrayList<>(transportConnectors); return new ArrayList<>(transportConnectors);
} }

View File

@ -36,6 +36,7 @@ public final class AdvisorySupport {
public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer."; public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic."; public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
public static final String ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Anonymous";
public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer."; public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer."; public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer.";
public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
@ -137,7 +138,9 @@ public final class AdvisorySupport {
public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) { public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
String prefix; String prefix;
if (destination.isQueue()) { if (destination == null) {
prefix = ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX;
} else if (destination.isQueue()) {
prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX; prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX;
} else { } else {
prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX; prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX;
@ -146,7 +149,8 @@ public final class AdvisorySupport {
} }
private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) { private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) {
return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "&sbquo;")); return destination != null ? new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "&sbquo;")):
new ActiveMQTopic(prefix);
} }
public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException { public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {

View File

@ -175,7 +175,6 @@ public class AdvisoryBrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection2); assertNoMessagesLeft(connection2);
} }
public void testProducerAdvisories() throws Exception { public void testProducerAdvisories() throws Exception {
ActiveMQDestination queue = new ActiveMQQueue("test"); ActiveMQDestination queue = new ActiveMQQueue("test");
@ -319,6 +318,105 @@ public class AdvisoryBrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection1); assertNoMessagesLeft(connection1);
} }
public void testAnonymousProducerAdvisoriesTrue() throws Exception {
//turn on support for anonymous producers
broker.setAnonymousProducerAdvisorySupport(true);
ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(null);
assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, destination.getPhysicalName());
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(100);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(consumerInfo1);
assertNoMessagesLeft(connection1);
// Setup a producer.
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
//don't set a destination
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.send(producerInfo2);
// We should get an advisory of the new produver.
Message m1 = receiveMessage(connection1);
assertNotNull(m1);
assertNotNull(m1.getDataStructure());
assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId());
assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, m1.getDestination().getPhysicalName());
// Close the second connection.
connection2.request(closeConnectionInfo(connectionInfo2));
connection2.stop();
// We should get an advisory of the producer closing
m1 = receiveMessage(connection1);
assertNotNull(m1);
assertNotNull(m1.getDataStructure());
RemoveInfo r = (RemoveInfo) m1.getDataStructure();
assertEquals(r.getObjectId(), producerInfo2.getProducerId());
assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, m1.getDestination().getPhysicalName());
assertNoMessagesLeft(connection2);
}
public void testAnonymousProducerAdvisoriesFalse() throws Exception {
broker.setAnonymousProducerAdvisorySupport(false);
assertAnonymousProducerAdvisoriesOff();
}
public void testAnonymousProducerAdvisoriesDefault() throws Exception {
//Default for now is to have anonymous producer advisories turned off
assertAnonymousProducerAdvisoriesOff();
}
private void assertAnonymousProducerAdvisoriesOff() throws Exception {
ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(null);
assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, destination.getPhysicalName());
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(100);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(consumerInfo1);
assertNoMessagesLeft(connection1);
// Setup a producer.
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
//don't set a destination
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.send(producerInfo2);
// We should get an advisory of the new produver.
Message m1 = receiveMessage(connection1, 1000);
assertNull(m1);
assertNoMessagesLeft(connection2);
}
public static Test suite() { public static Test suite() {
return suite(AdvisoryBrokerTest.class); return suite(AdvisoryBrokerTest.class);
} }