git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@745031 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-02-17 12:56:24 +00:00
parent e48ff70d4b
commit 9ad6c089fa
5 changed files with 79 additions and 57 deletions

View File

@ -17,12 +17,18 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@ -57,6 +63,7 @@ public abstract class BaseDestination implements Destination {
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
private boolean sendAdvisoryIfNoConsumers;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService;
protected final Broker regionBroker;
@ -323,6 +330,14 @@ public abstract class BaseDestination implements Destination {
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
public boolean isSendAdvisoryIfNoConsumers() {
return sendAdvisoryIfNoConsumers;
}
public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
/**
* @return the dead letter strategy
@ -420,4 +435,54 @@ public abstract class BaseDestination implements Destination {
this.destinationStatistics.setParent(null);
this.memoryUsage.stop();
}
/**
* Provides a hook to allow messages with no consumer to be processed in
* some way - such as to send to a dead letter queue or something..
*/
protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Exception {
if (!message.isPersistent()) {
if (isSendAdvisoryIfNoConsumers()) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
// The original destination and transaction id do not get
// filled when the message is first sent,
// it is only populated if the message is routed to another
// destination like the DLQ
if (message.getOriginalDestination() != null) {
message.setOriginalDestination(message.getDestination());
}
if (message.getOriginalTransactionId() != null) {
message.setOriginalTransactionId(message.getTransactionId());
}
ActiveMQTopic advisoryTopic;
if (destination.isQueue()) {
advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
} else {
advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
}
message.setDestination(advisoryTopic);
message.setTransactionId(null);
// Disable flow control for this since since we don't want
// to block.
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setMutable(false);
producerExchange.setConnectionContext(context);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
}
}
}
}
}

View File

@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -50,6 +51,7 @@ import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
@ -62,6 +64,7 @@ import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
@ -1210,6 +1213,11 @@ public class Queue extends BaseDestination implements Task {
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);
synchronized (consumers) {
if (consumers.isEmpty()) {
onMessageWithNoConsumers(context, msg);
}
}
wakeup();
}

View File

@ -70,7 +70,6 @@ public class Topic extends BaseDestination implements Task{
protected final Valve dispatchValve = new Valve(true);
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@ -541,14 +540,6 @@ public class Topic extends BaseDestination implements Task{
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
}
public boolean isSendAdvisoryIfNoConsumers() {
return sendAdvisoryIfNoConsumers;
}
public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
// Implementation methods
// -------------------------------------------------------------------------
@ -601,48 +592,5 @@ public class Topic extends BaseDestination implements Task{
}
}
/**
* Provides a hook to allow messages with no consumer to be processed in
* some way - such as to send to a dead letter queue or something..
*/
protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Exception {
if (!message.isPersistent()) {
if (sendAdvisoryIfNoConsumers) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
// The original destination and transaction id do not get
// filled when the message is first sent,
// it is only populated if the message is routed to another
// destination like the DLQ
if (message.getOriginalDestination() != null) {
message.setOriginalDestination(message.getDestination());
}
if (message.getOriginalTransactionId() != null) {
message.setOriginalTransactionId(message.getTransactionId());
}
ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
message.setDestination(advisoryTopic);
message.setTransactionId(null);
// Disable flow control for this since since we don't want
// to block.
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setMutable(false);
producerExchange.setConnectionContext(context);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
}
}
}
}
}

View File

@ -110,7 +110,6 @@ public class PolicyEntry extends DestinationMapEntry {
if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
}
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
if (memoryLimit > 0) {
topic.getMemoryUsage().setLimit(memoryLimit);
}
@ -132,6 +131,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {

View File

@ -37,9 +37,6 @@ public class NoConsumerDeadLetterTest extends DeadLetterTestSupport {
public void testDurableTopicMessage() throws Exception {
}
public void testTransientQueueMessage() throws Exception {
}
protected void doTest() throws Exception {
makeDlqConsumer();
sendMessages();
@ -65,7 +62,11 @@ public class NoConsumerDeadLetterTest extends DeadLetterTestSupport {
}
protected Destination createDlqDestination() {
return AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
if (this.topic) {
return AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
} else {
return AdvisorySupport.getNoQueueConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
}
}
}