mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2443 - sendAdvisoryIfNoConsumers
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@822797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dd957fae8c
commit
27183f8cc8
|
@ -25,6 +25,7 @@ import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.command.MessageDispatchNotification;
|
import org.apache.activemq.command.MessageDispatchNotification;
|
||||||
|
@ -471,13 +472,14 @@ public abstract class BaseDestination implements Destination {
|
||||||
* Provides a hook to allow messages with no consumer to be processed in
|
* Provides a hook to allow messages with no consumer to be processed in
|
||||||
* some way - such as to send to a dead letter queue or something..
|
* some way - such as to send to a dead letter queue or something..
|
||||||
*/
|
*/
|
||||||
protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Exception {
|
protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
|
||||||
if (!message.isPersistent()) {
|
if (!msg.isPersistent()) {
|
||||||
if (isSendAdvisoryIfNoConsumers()) {
|
if (isSendAdvisoryIfNoConsumers()) {
|
||||||
// allow messages with no consumers to be dispatched to a dead
|
// allow messages with no consumers to be dispatched to a dead
|
||||||
// letter queue
|
// letter queue
|
||||||
if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
|
if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
|
||||||
|
|
||||||
|
Message message = msg.copy();
|
||||||
// The original destination and transaction id do not get
|
// The original destination and transaction id do not get
|
||||||
// filled when the message is first sent,
|
// filled when the message is first sent,
|
||||||
// it is only populated if the message is routed to another
|
// it is only populated if the message is routed to another
|
||||||
|
|
|
@ -16,14 +16,23 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.policy;
|
package org.apache.activemq.broker.policy;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.advisory.AdvisorySupport;
|
import org.apache.activemq.advisory.AdvisorySupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
|
@ -47,6 +56,38 @@ public class NoConsumerDeadLetterTest extends DeadLetterTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testConsumerReceivesMessages() throws Exception {
|
||||||
|
this.topic = false;
|
||||||
|
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
|
||||||
|
connection = (ActiveMQConnection)factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(getDestination());
|
||||||
|
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||||
|
|
||||||
|
Topic advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(getDestination());
|
||||||
|
MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
|
||||||
|
|
||||||
|
TextMessage msg = session.createTextMessage("Message: x");
|
||||||
|
producer.send(msg);
|
||||||
|
|
||||||
|
Message advisoryMessage = advisoryConsumer.receive(1000);
|
||||||
|
assertNotNull("Advisory message not received", advisoryMessage);
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
factory = (ActiveMQConnectionFactory)createConnectionFactory();
|
||||||
|
connection = (ActiveMQConnection)factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(getDestination());
|
||||||
|
Message received = consumer.receive(1000);
|
||||||
|
assertNotNull("Message not received", received);
|
||||||
|
}
|
||||||
|
|
||||||
protected BrokerService createBroker() throws Exception {
|
protected BrokerService createBroker() throws Exception {
|
||||||
BrokerService broker = super.createBroker();
|
BrokerService broker = super.createBroker();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue