diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 87f7a80f2c..17e934ff21 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -77,7 +77,7 @@ public class TopicSubscription extends AbstractSubscription { super(broker, context, info); this.usageManager = usageManager; String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; - if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) { + if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) { this.matched = new VMPendingMessageCursor(false); } else { this.matched = new FilePendingMessageCursor(broker,matchedName,false); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java index 66e77ece53..244136436e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.policy; import java.util.ArrayList; import java.util.List; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicSubscription; @@ -27,6 +28,8 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.usage.SystemUsage; +import org.apache.derby.iapi.jdbc.BrokeredStatement; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -41,6 +44,7 @@ public class PriorityNetworkDispatchPolicyTest { ActiveMQMessage node = new ActiveMQMessage(); ConsumerId id = new ConsumerId(); ConnectionContext context = new ConnectionContext(); + BrokerService brokerService = new BrokerService(); @Before public void init() throws Exception { @@ -50,14 +54,20 @@ public class PriorityNetworkDispatchPolicyTest { info.setNetworkConsumerPath(new ConsumerId[]{id}); } + @After + public void stopBroker() throws Exception { + brokerService.stop(); + } + @Test public void testRemoveLowerPriorityDup() throws Exception { + List consumers = new ArrayList(); for (int i=0; i<3; i++) { ConsumerInfo instance = info.copy(); instance.setPriority((byte)i); - consumers.add(new TopicSubscription(null, context, instance, usageManager)); + consumers.add(new TopicSubscription(brokerService.getBroker(), context, instance, usageManager)); } underTest.dispatch(node, null, consumers);