git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@692134 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-09-04 16:04:27 +00:00
parent 391077ee15
commit 3e2b3d25fe
1 changed files with 9 additions and 8 deletions

View File

@ -17,10 +17,10 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Set;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
@ -28,11 +28,9 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
@ -564,7 +562,8 @@ public class Topic extends BaseDestination implements Task{
protected void dispatch(final ConnectionContext context, Message message) throws Exception {
destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
dispatchValve.increment();
MessageEvaluationContext msgContext = null;
try {
if (!subscriptionRecoveryPolicy.add(context, message)) {
return;
@ -575,16 +574,18 @@ public class Topic extends BaseDestination implements Task{
return;
}
}
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
msgContext = context.getMessageEvaluationContext();
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
onMessageWithNoConsumers(context, message);
}
msgContext.clear();
}
} finally {
dispatchValve.decrement();
if(msgContext != null) {
msgContext.clear();
}
}
}