minor refactor to provide a hook when dispatching messages which have no consumers; so that we can for example, send them to a dead letter queue

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@359547 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2005-12-28 15:33:05 +00:00
parent 1e0bbfc3ed
commit e58897bd69
5 changed files with 30 additions and 8 deletions

View File

@ -273,13 +273,17 @@ public class Topic implements Destination {
if (! subscriptionRecoveryPolicy.add(context, message)) {
return;
}
if (consumers.isEmpty())
if (consumers.isEmpty()) {
onMessageWithNoConsumers(context, message);
return;
}
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
dispatchPolicy.dispatch(context, message, msgContext, consumers);
if (!dispatchPolicy.dispatch(context, message, msgContext, consumers)) {
onMessageWithNoConsumers(context, message);
}
}
finally {
msgContext.clear();
@ -287,6 +291,15 @@ public class Topic implements Destination {
}
}
/**
* Provides a hook to allow messages with no consumer to be processed in some way - such as to send to a dead letter queue or something..
*/
protected void onMessageWithNoConsumers(ConnectionContext context, Message message) {
if (! message.isPersistent()) {
// allow messages with no consumers to be dispatched to a dead letter queue
}
}
public MessageStore getMessageStore() {
return store;
}

View File

@ -41,7 +41,9 @@ public interface DispatchPolicy {
* large pre-fetch may take all the messages if he is always dispatched to first.
* Once a message has been locked, it does not need to be dispatched to any
* further subscriptions.
*
* @return true if at least one consumer was dispatched or false if there are no active subscriptions that could be dispatched
*/
void dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable;
boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable;
}

View File

@ -37,12 +37,13 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
private final Object mutex = new Object();
public void dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable {
public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable {
// Big synch here so that only 1 message gets dispatched at a time. Ensures
// Everyone sees the same order and that the consumer list is not used while
// it's being rotated.
synchronized(mutex) {
int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
@ -52,6 +53,7 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
continue;
sub.add(node);
count++;
}
// Rotate the consumer list.
@ -59,6 +61,7 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
consumers.add(consumers.remove(0));
} catch (Throwable bestEffort) {
}
return count > 0;
}
}

View File

@ -35,8 +35,8 @@ import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
*/
public class SimpleDispatchPolicy implements DispatchPolicy {
public void dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable {
public boolean dispatch(ConnectionContext context, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable {
int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
@ -48,7 +48,9 @@ public class SimpleDispatchPolicy implements DispatchPolicy {
continue;
sub.add(node);
count++;
}
return count > 0;
}
}

View File

@ -36,11 +36,11 @@ public class StrictOrderDispatchPolicy implements DispatchPolicy {
int i=0;
private final Object mutex = new Object();
public void dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable {
public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable {
// Big synch here so that only 1 message gets dispatched at a time. Ensures
// Everyone sees the same order.
synchronized(mutex) {
int count = 0;
i++;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
@ -50,7 +50,9 @@ public class StrictOrderDispatchPolicy implements DispatchPolicy {
continue;
sub.add(node);
count++;
}
return count > 0;
}
}