From 27183f8cc83a99abcb29000504f1c0e6ada56eb3 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 7 Oct 2009 16:37:28 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2443 - sendAdvisoryIfNoConsumers git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@822797 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/BaseDestination.java | 6 ++- .../policy/NoConsumerDeadLetterTest.java | 41 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index a5cfda3953..ee88c8eb68 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -25,6 +25,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatchNotification; @@ -471,13 +472,14 @@ public abstract class BaseDestination implements Destination { * Provides a hook to allow messages with no consumer to be processed in * some way - such as to send to a dead letter queue or something.. */ - protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Exception { - if (!message.isPersistent()) { + protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { + if (!msg.isPersistent()) { if (isSendAdvisoryIfNoConsumers()) { // allow messages with no consumers to be dispatched to a dead // letter queue if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { + Message message = msg.copy(); // The original destination and transaction id do not get // filled when the message is first sent, // it is only populated if the message is routed to another diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java index 0ecfe9d5b6..4a6c9e88bc 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java @@ -16,14 +16,23 @@ */ package org.apache.activemq.broker.policy; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; /** * @version $Revision$ @@ -46,6 +55,38 @@ public class NoConsumerDeadLetterTest extends DeadLetterTestSupport { assertNotNull("Should be a message for loop: " + i, msg); } } + + public void testConsumerReceivesMessages() throws Exception { + this.topic = false; + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + connection = (ActiveMQConnection)factory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(getDestination()); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + Topic advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(getDestination()); + MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic); + + TextMessage msg = session.createTextMessage("Message: x"); + producer.send(msg); + + Message advisoryMessage = advisoryConsumer.receive(1000); + assertNotNull("Advisory message not received", advisoryMessage); + + Thread.sleep(1000); + + factory = (ActiveMQConnectionFactory)createConnectionFactory(); + connection = (ActiveMQConnection)factory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(getDestination()); + Message received = consumer.receive(1000); + assertNotNull("Message not received", received); + } protected BrokerService createBroker() throws Exception { BrokerService broker = super.createBroker();