diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 1508c619ec..f2d41e9e1a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -229,8 +229,10 @@ public class AdvisoryBroker extends BrokerFilter { public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.addProducer(context, info); - // Don't advise advisory topics. - if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { + //Verify destination is either non-null or that we want to advise anonymous producers on null destination + //Don't advise advisory topics. + if ((info.getDestination() != null || getBrokerService().isAnonymousProducerAdvisorySupport()) + && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); fireProducerAdvisory(context, info.getDestination(), topic, info); producers.put(info.getProducerId(), info); @@ -412,12 +414,13 @@ public class AdvisoryBroker extends BrokerFilter { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.removeProducer(context, info); - // Don't advise advisory topics. + //Verify destination is either non-null or that we want to advise anonymous producers on null destination + //Don't advise advisory topics. ActiveMQDestination dest = info.getDestination(); - if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { + if ((dest != null || getBrokerService().isAnonymousProducerAdvisorySupport()) && !AdvisorySupport.isAdvisoryTopic(dest)) { ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); producers.remove(info.getProducerId()); - if (!dest.isTemporary() || destinations.containsKey(dest)) { + if (dest == null || !dest.isTemporary() || destinations.containsKey(dest)) { fireProducerAdvisory(context, dest, topic, info.createRemoveCommand()); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 0b05d3132e..df27da1566 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -192,6 +192,7 @@ public class BrokerService implements Service { // to other jms messaging systems private boolean deleteAllMessagesOnStartup; private boolean advisorySupport = true; + private boolean anonymousProducerAdvisorySupport = false; private URI vmConnectorURI; private String defaultSocketURIString; private PolicyMap destinationPolicy; @@ -1522,6 +1523,14 @@ public class BrokerService implements Service { this.advisorySupport = advisorySupport; } + public boolean isAnonymousProducerAdvisorySupport() { + return anonymousProducerAdvisorySupport; + } + + public void setAnonymousProducerAdvisorySupport(boolean anonymousProducerAdvisorySupport) { + this.anonymousProducerAdvisorySupport = anonymousProducerAdvisorySupport; + } + public List getTransportConnectors() { return new ArrayList<>(transportConnectors); } diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index 402cad4553..c030acded2 100644 --- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -36,6 +36,7 @@ public final class AdvisorySupport { public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer."; public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic."; + public static final String ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Anonymous"; public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer."; public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer."; public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; @@ -137,7 +138,9 @@ public final class AdvisorySupport { public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) { String prefix; - if (destination.isQueue()) { + if (destination == null) { + prefix = ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX; + } else if (destination.isQueue()) { prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX; } else { prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX; @@ -146,7 +149,8 @@ public final class AdvisorySupport { } private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) { - return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "‚")); + return destination != null ? new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "‚")): + new ActiveMQTopic(prefix); } public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java index c65dc53931..438d6494f0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java @@ -175,7 +175,6 @@ public class AdvisoryBrokerTest extends BrokerTestSupport { assertNoMessagesLeft(connection2); } - public void testProducerAdvisories() throws Exception { ActiveMQDestination queue = new ActiveMQQueue("test"); @@ -319,6 +318,105 @@ public class AdvisoryBrokerTest extends BrokerTestSupport { assertNoMessagesLeft(connection1); } + public void testAnonymousProducerAdvisoriesTrue() throws Exception { + //turn on support for anonymous producers + broker.setAnonymousProducerAdvisorySupport(true); + + ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(null); + assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, destination.getPhysicalName()); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(100); + + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(consumerInfo1); + + assertNoMessagesLeft(connection1); + + // Setup a producer. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + //don't set a destination + + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.send(producerInfo2); + + // We should get an advisory of the new produver. + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); + assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, m1.getDestination().getPhysicalName()); + + // Close the second connection. + connection2.request(closeConnectionInfo(connectionInfo2)); + connection2.stop(); + + // We should get an advisory of the producer closing + m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + RemoveInfo r = (RemoveInfo) m1.getDataStructure(); + assertEquals(r.getObjectId(), producerInfo2.getProducerId()); + assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, m1.getDestination().getPhysicalName()); + + assertNoMessagesLeft(connection2); + } + + public void testAnonymousProducerAdvisoriesFalse() throws Exception { + broker.setAnonymousProducerAdvisorySupport(false); + + assertAnonymousProducerAdvisoriesOff(); + } + + public void testAnonymousProducerAdvisoriesDefault() throws Exception { + //Default for now is to have anonymous producer advisories turned off + assertAnonymousProducerAdvisoriesOff(); + } + + private void assertAnonymousProducerAdvisoriesOff() throws Exception { + ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(null); + assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, destination.getPhysicalName()); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(100); + + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(consumerInfo1); + + assertNoMessagesLeft(connection1); + + // Setup a producer. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + //don't set a destination + + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.send(producerInfo2); + + // We should get an advisory of the new produver. + Message m1 = receiveMessage(connection1, 1000); + assertNull(m1); + + assertNoMessagesLeft(connection2); + } + public static Test suite() { return suite(AdvisoryBrokerTest.class); }