From 3e2b3d25fe54034066d5a7bef3cdbd013a1f5232 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 4 Sep 2008 16:04:27 +0000 Subject: [PATCH] Applied patch for https://issues.apache.org/activemq/browse/AMQ-1833 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@692134 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Topic.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index fc9ef9ce8c..3d124973a8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -17,10 +17,10 @@ package org.apache.activemq.broker.region; import java.io.IOException; -import java.util.LinkedList; -import java.util.Set; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -28,11 +28,9 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; -import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; import org.apache.activemq.command.ActiveMQDestination; @@ -564,7 +562,8 @@ public class Topic extends BaseDestination implements Task{ protected void dispatch(final ConnectionContext context, Message message) throws Exception { destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); - dispatchValve.increment(); + dispatchValve.increment(); + MessageEvaluationContext msgContext = null; try { if (!subscriptionRecoveryPolicy.add(context, message)) { return; @@ -575,16 +574,18 @@ public class Topic extends BaseDestination implements Task{ return; } } - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); + msgContext = context.getMessageEvaluationContext(); msgContext.setDestination(destination); msgContext.setMessageReference(message); if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { onMessageWithNoConsumers(context, message); - } - msgContext.clear(); + } } finally { dispatchValve.decrement(); + if(msgContext != null) { + msgContext.clear(); + } } }