diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 0cc7cd7e17..713199e4f6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -1067,6 +1067,11 @@ public class BrokerService implements Service { */ public synchronized Store getTempDataStore() { if (tempDataStore == null) { + + if (!isPersistent()) { + return null; + } + boolean result = true; boolean empty = true; try { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index a31819c651..91f04f005a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -118,7 +118,7 @@ public class Queue implements Destination, Task { this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString()); this.memoryUsage.setUsagePortion(1.0f); this.store = store; - if (destination.isTemporary()) { + if (destination.isTemporary() || tmpStore==null ) { this.messages = new VMPendingMessageCursor(); } else { this.messages = new StoreQueueCursor(this, tmpStore); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index fea09baf89..5d4a6d478b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -26,6 +26,7 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; import org.apache.activemq.command.ConsumerControl; @@ -36,6 +37,7 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.Response; +import org.apache.activemq.kaha.Store; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.usage.SystemUsage; import org.apache.commons.logging.Log; @@ -66,8 +68,12 @@ public class TopicSubscription extends AbstractSubscription { super(broker, context, info); this.usageManager = usageManager; String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; - this.matched = new FilePendingMessageCursor(matchedName, broker.getTempDataStore()); - + Store tempDataStore = broker.getTempDataStore(); + if (tempDataStore != null) { + this.matched = new FilePendingMessageCursor(matchedName, tempDataStore); + } else { + this.matched = new VMPendingMessageCursor(); + } } public void init() throws Exception {