mirror of https://github.com/apache/activemq.git
imporoved the advisory message being sent by topics when no consumer is listening:
- It sends it non transactional - It does does not block due to flow control git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@360329 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
77dd8fb71d
commit
61ab31e758
|
@ -152,6 +152,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
|
||||||
|
|
||||||
} else if( ack.isPoisonAck() ) {
|
} else if( ack.isPoisonAck() ) {
|
||||||
|
|
||||||
|
// TODO: what if the message is already in a DLQ???
|
||||||
|
|
||||||
// Handle the poison ACK case: we need to send the message to a DLQ
|
// Handle the poison ACK case: we need to send the message to a DLQ
|
||||||
if( ack.isInTransaction() )
|
if( ack.isInTransaction() )
|
||||||
throw new JMSException("Poison ack cannot be transacted: "+ack);
|
throw new JMSException("Poison ack cannot be transacted: "+ack);
|
||||||
|
@ -175,20 +177,16 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
|
||||||
Message message = node.getMessage();
|
Message message = node.getMessage();
|
||||||
if( message !=null ) {
|
if( message !=null ) {
|
||||||
|
|
||||||
// TODO is this meant to be == null?
|
// The original destination and transaction id do not get filled when the message is first sent,
|
||||||
|
// it is only populated if the message is routed to another destination like the DLQ
|
||||||
if( message.getOriginalDestination()!=null )
|
if( message.getOriginalDestination()!=null )
|
||||||
message.setOriginalDestination(message.getDestination());
|
message.setOriginalDestination(message.getDestination());
|
||||||
|
|
||||||
ActiveMQDestination originalDestination = message.getOriginalDestination();
|
|
||||||
if (originalDestination == null) {
|
|
||||||
originalDestination = message.getDestination();
|
|
||||||
}
|
|
||||||
DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy();
|
|
||||||
ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(originalDestination);
|
|
||||||
message.setDestination(deadLetterDestination);
|
|
||||||
|
|
||||||
if( message.getOriginalTransactionId()!=null )
|
if( message.getOriginalTransactionId()!=null )
|
||||||
message.setOriginalTransactionId(message.getTransactionId());
|
message.setOriginalTransactionId(message.getTransactionId());
|
||||||
|
|
||||||
|
DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy();
|
||||||
|
ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
|
||||||
|
message.setDestination(deadLetterDestination);
|
||||||
message.setTransactionId(null);
|
message.setTransactionId(null);
|
||||||
message.evictMarshlledForm();
|
message.evictMarshlledForm();
|
||||||
|
|
||||||
|
|
|
@ -338,11 +338,29 @@ public class Topic implements Destination {
|
||||||
if (sendAdvisoryIfNoConsumers) {
|
if (sendAdvisoryIfNoConsumers) {
|
||||||
// allow messages with no consumers to be dispatched to a dead
|
// allow messages with no consumers to be dispatched to a dead
|
||||||
// letter queue
|
// letter queue
|
||||||
ActiveMQDestination originalDestination = message.getDestination();
|
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
|
||||||
if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) {
|
|
||||||
ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(originalDestination);
|
// The original destination and transaction id do not get filled when the message is first sent,
|
||||||
|
// it is only populated if the message is routed to another destination like the DLQ
|
||||||
|
if( message.getOriginalDestination()!=null )
|
||||||
|
message.setOriginalDestination(message.getDestination());
|
||||||
|
if( message.getOriginalTransactionId()!=null )
|
||||||
|
message.setOriginalTransactionId(message.getTransactionId());
|
||||||
|
|
||||||
|
ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
|
||||||
message.setDestination(advisoryTopic);
|
message.setDestination(advisoryTopic);
|
||||||
context.getBroker().send(context, message);
|
message.setTransactionId(null);
|
||||||
|
message.evictMarshlledForm();
|
||||||
|
|
||||||
|
// Disable flow control for this since since we don't want to block.
|
||||||
|
boolean originalFlowControl = context.isProducerFlowControl();
|
||||||
|
try {
|
||||||
|
context.setProducerFlowControl(false);
|
||||||
|
context.getBroker().send(context, message);
|
||||||
|
} finally {
|
||||||
|
context.setProducerFlowControl(originalFlowControl);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue