enable by default that non-persistent topic messages which have no consumers are sent to a dead letter topic

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@359800 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2005-12-29 14:12:46 +00:00
parent ff457e227a
commit c2408b8997
3 changed files with 84 additions and 23 deletions

View File

@ -29,6 +29,10 @@ public class AdvisorySupport {
public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX+"TempTopic"); public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX+"TempTopic");
public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Producer."; public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Producer.";
public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Consumer."; public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Consumer.";
public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Topic.";
public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Queue.";
public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic.";
public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue.";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory"; public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC); public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC);
@ -46,6 +50,26 @@ public class AdvisorySupport {
return new ActiveMQTopic(name); return new ActiveMQTopic(name);
} }
public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) { public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
switch( destination.getDestinationType() ) { switch( destination.getDestinationType() ) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:

View File

@ -18,12 +18,14 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
@ -57,9 +59,10 @@ public class Topic implements Destination {
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
private boolean sendAdvisoryIfNoConsumers = true;
public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) { TaskRunnerFactory taskFactory) {
this.destination = destination; this.destination = destination;
this.store = store; this.store = store;
@ -166,33 +169,34 @@ public class Topic implements Destination {
public void send(final ConnectionContext context, final Message message) throws Throwable { public void send(final ConnectionContext context, final Message message) throws Throwable {
if( context.isProducerFlowControl() ) if (context.isProducerFlowControl())
usageManager.waitForSpace(); usageManager.waitForSpace();
message.setRegionDestination(this); message.setRegionDestination(this);
if (store != null && message.isPersistent()) if (store != null && message.isPersistent())
store.addMessage(context, message); store.addMessage(context, message);
message.incrementReferenceCount(); message.incrementReferenceCount();
try { try {
if (context.isInTransaction()) { if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Throwable { public void afterCommit() throws Throwable {
dispatch(context, message); dispatch(context, message);
} }
}); });
} }
else { else {
dispatch(context, message); dispatch(context, message);
} }
} finally { }
finally {
message.decrementReferenceCount(); message.decrementReferenceCount();
} }
} }
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException { public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
@ -236,7 +240,7 @@ public class Topic implements Destination {
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
public UsageManager getUsageManager() { public UsageManager getUsageManager() {
return usageManager; return usageManager;
} }
@ -265,12 +269,26 @@ public class Topic implements Destination {
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
} }
public boolean isSendAdvisoryIfNoConsumers() {
return sendAdvisoryIfNoConsumers;
}
public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
public MessageStore getMessageStore() {
return store;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void dispatch(ConnectionContext context, Message message) throws Throwable { protected void dispatch(ConnectionContext context, Message message) throws Throwable {
destinationStatistics.getEnqueues().increment(); destinationStatistics.getEnqueues().increment();
dispatchValve.increment(); dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try { try {
if (! subscriptionRecoveryPolicy.add(context, message)) { if (!subscriptionRecoveryPolicy.add(context, message)) {
return; return;
} }
if (consumers.isEmpty()) { if (consumers.isEmpty()) {
@ -280,7 +298,7 @@ public class Topic implements Destination {
msgContext.setDestination(destination); msgContext.setDestination(destination);
msgContext.setMessageReference(message); msgContext.setMessageReference(message);
if (!dispatchPolicy.dispatch(context, message, msgContext, consumers)) { if (!dispatchPolicy.dispatch(context, message, msgContext, consumers)) {
onMessageWithNoConsumers(context, message); onMessageWithNoConsumers(context, message);
} }
@ -291,17 +309,23 @@ public class Topic implements Destination {
} }
} }
/** /**
* Provides a hook to allow messages with no consumer to be processed in some way - such as to send to a dead letter queue or something.. * Provides a hook to allow messages with no consumer to be processed in
* some way - such as to send to a dead letter queue or something..
*/ */
protected void onMessageWithNoConsumers(ConnectionContext context, Message message) { protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Throwable {
if (! message.isPersistent()) { if (!message.isPersistent()) {
// allow messages with no consumers to be dispatched to a dead letter queue if (sendAdvisoryIfNoConsumers) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
ActiveMQDestination originalDestination = message.getDestination();
if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) {
ActiveMQTopic advisoryTopic = AdvisorySupport.getExpiredTopicMessageAdvisoryTopic(originalDestination);
message.setDestination(advisoryTopic);
context.getBroker().send(context, message);
}
}
} }
} }
public MessageStore getMessageStore() {
return store;
}
} }

View File

@ -23,7 +23,7 @@ import org.apache.activemq.filter.DestinationMapEntry;
/** /**
* Represents an entry in a {@link PolicyMap} for assigning policies to a * Represents an entry in a {@link PolicyMap} for assigning policies to a
* specific destination or a hierarchial wildcard area of destinations. * specific destination or a hierarchical wildcard area of destinations.
* *
* @org.xbean.XBean * @org.xbean.XBean
* *
@ -34,6 +34,7 @@ public class PolicyEntry extends DestinationMapEntry {
private DispatchPolicy dispatchPolicy; private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private RedeliveryPolicy redeliveryPolicy; private RedeliveryPolicy redeliveryPolicy;
private boolean sendAdvisoryIfNoConsumers = true;
public void configure(Queue queue) { public void configure(Queue queue) {
if (dispatchPolicy != null) { if (dispatchPolicy != null) {
@ -48,6 +49,7 @@ public class PolicyEntry extends DestinationMapEntry {
if (subscriptionRecoveryPolicy != null) { if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy); topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy);
} }
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
} }
// Properties // Properties
@ -76,4 +78,15 @@ public class PolicyEntry extends DestinationMapEntry {
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
} }
public boolean isSendAdvisoryIfNoConsumers() {
return sendAdvisoryIfNoConsumers;
}
/**
* Sends an advisory message if a non-persistent message is sent and there
* are no active consumers
*/
public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
} }