diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index f41f9a1647..b2321ff6e1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -93,7 +93,7 @@ public class BrokerView implements BrokerViewMBean { return broker.getDestinationStatistics().getMessagesCached().getCount(); } - public int getMemoryPercentageUsed() { + public int getMemoryPercentUsage() { return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage(); } @@ -109,16 +109,16 @@ public class BrokerView implements BrokerViewMBean { return brokerService.getSystemUsage().getStoreUsage().getLimit(); } - public int getStorePercentageUsed() { + public int getStorePercentUsage() { return brokerService.getSystemUsage().getStoreUsage().getPercentUsage(); } - public long getTmpLimit() { + public long getTempLimit() { return brokerService.getSystemUsage().getTempUsage().getLimit(); } - public int getTmpPercentageUsed() { + public int getTempPercentUsage() { return brokerService.getSystemUsage().getTempUsage().getPercentUsage(); } @@ -126,7 +126,7 @@ public class BrokerView implements BrokerViewMBean { brokerService.getSystemUsage().getStoreUsage().setLimit(limit); } - public void setTmpLimit(long limit) { + public void setTempLimit(long limit) { brokerService.getSystemUsage().getTempUsage().setLimit(limit); } @@ -172,7 +172,7 @@ public class BrokerView implements BrokerViewMBean { } public ObjectName[] getTopicSubscribers() { - return broker.getTemporaryTopicSubscribers(); + return broker.getTopicSubscribers(); } public ObjectName[] getDurableTopicSubscribers() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index 1af04e18e7..6589489aa5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -61,23 +61,23 @@ public interface BrokerViewMBean extends Service { long getTotalMessageCount(); - int getMemoryPercentageUsed(); + int getMemoryPercentUsage(); long getMemoryLimit(); void setMemoryLimit(long limit); - int getStorePercentageUsed(); + int getStorePercentUsage(); long getStoreLimit(); void setStoreLimit(long limit); - int getTmpPercentageUsed(); + int getTempPercentUsage(); - long getTmpLimit(); + long getTempLimit(); - void setTmpLimit(long limit); + void setTempLimit(long limit); boolean isPersistent(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index c618c36108..7dc51244f4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -93,7 +93,7 @@ public class DestinationView implements DestinationViewMBean { return destination.getDestinationStatistics().getMessagesCached().getCount(); } - public int getMemoryPercentageUsed() { + public int getMemoryPercentUsage() { return destination.getMemoryUsage().getPercentUsage(); } @@ -294,7 +294,7 @@ public class DestinationView implements DestinationViewMBean { } - public float getMemoryLimitPortion() { + public float getMemoryUsagePortion() { return destination.getMemoryUsage().getUsagePortion(); } @@ -306,7 +306,7 @@ public class DestinationView implements DestinationViewMBean { return destination.isProducerFlowControl(); } - public void setMemoryLimitPortion(float value) { + public void setMemoryUsagePortion(float value) { destination.getMemoryUsage().setUsagePortion(value); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 6bcc0c1dfa..4bc5bd47f3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -127,7 +127,7 @@ public interface DestinationViewMBean { /** * @return the percentage of amount of memory used */ - int getMemoryPercentageUsed(); + int getMemoryPercentUsage(); /** * @return the amount of memory allocated to this destination @@ -143,13 +143,13 @@ public interface DestinationViewMBean { /** * @return the portion of memory from the broker memory limit for this destination */ - float getMemoryLimitPortion(); + float getMemoryUsagePortion(); /** * set the portion of memory from the broker memory limit for this destination * @param value */ - void setMemoryLimitPortion(float value); + void setMemoryUsagePortion(float value); /** * Browses the current destination returning a list of messages diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 7d44b956b4..874653688e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -139,10 +139,7 @@ public abstract class BaseDestination implements Destination { return destination; } - public final String getDestination() { - return destination.getPhysicalName(); - } - + public final String getName() { return getActiveMQDestination().getPhysicalName(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java index f8a985ae28..e514a770f5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java @@ -76,7 +76,7 @@ public class DestinationFactoryImpl extends DestinationFactory { if (destination.isQueue()) { if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; - return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory, broker.getTempDataStore()) { + return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory) { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { // Only consumers on the same connection can consume @@ -90,7 +90,7 @@ public class DestinationFactoryImpl extends DestinationFactory { }; } else { MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination); - Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory, broker.getTempDataStore()); + Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory); configureQueue(queue, destination); queue.initialize(); return queue; @@ -127,7 +127,7 @@ public class DestinationFactoryImpl extends DestinationFactory { if (broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) { - entry.configure(queue, broker.getTempDataStore()); + entry.configure(broker,queue); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 97820c23ff..81e7f2ef2e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -52,7 +52,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws JMSException { super(broker,usageManager, context, info); - this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this); + this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); this.pending.setSystemUsage(usageManager); this.keepDurableSubsActive = keepDurableSubsActive; subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); @@ -218,19 +218,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us node.decrementReferenceCount(); } - public String getSubscriptionName() { - return subscriptionKey.getSubscriptionName(); - } - + public synchronized String toString() { return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension; } - public String getClientId() { - return subscriptionKey.getClientId(); - } - public SubscriptionKey getSubscriptionKey() { return subscriptionKey; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 5f24b95f6a..019d98c646 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -95,14 +95,14 @@ public class Queue extends BaseDestination implements Task { }; }; - public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats, - TaskRunnerFactory taskFactory, Store tmpStore) throws Exception { + public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats, + TaskRunnerFactory taskFactory) throws Exception { super(broker, store, destination,systemUsage, parentStats); - if (destination.isTemporary() || tmpStore==null ) { + if (destination.isTemporary() || broker == null || store==null ) { this.messages = new VMPendingMessageCursor(); } else { - this.messages = new StoreQueueCursor(this, tmpStore); + this.messages = new StoreQueueCursor(broker,this); } this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName()); @@ -318,11 +318,11 @@ public class Queue extends BaseDestination implements Task { final ConnectionContext context = producerExchange.getConnectionContext(); // There is delay between the client sending it and it arriving at the // destination.. it may have expired. - + message.setRegionDestination(this); final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode(); if (message.isExpired()) { - broker.messageExpired(context, message); + broker.getRoot().messageExpired(context, message); //message not added to stats yet //destinationStatistics.getMessages().decrement(); if (sendProducerAck) { @@ -402,6 +402,7 @@ public class Queue extends BaseDestination implements Task { if (log.isDebugEnabled()) { log.debug("Expired message: " + message); } + broker.getRoot().messageExpired(context, message); return; } } @@ -416,7 +417,6 @@ public class Queue extends BaseDestination implements Task { void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); synchronized (sendLock) { - message.setRegionDestination(this); if (store != null && message.isPersistent()) { while (!systemUsage.getStoreUsage().waitForSpace(1000)) { if (context.getStopping().get()) { @@ -678,11 +678,7 @@ public class Queue extends BaseDestination implements Task { // We should only delete messages that can be locked. if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { - MessageAck ack = new MessageAck(); - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); - ack.setDestination(destination); - ack.setMessageID(r.getMessageId()); - removeMessage(c, null, r, ack); + removeMessage(c,(IndirectMessageReference) r); } } catch (IOException e) { } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index f836ce5a7c..877121c889 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -705,14 +705,18 @@ public class RegionBroker implements Broker { deadLetterDestination); sent=true; } + }else { + //don't want to warn about failing to send + // if there isn't a dead letter strategy + sent=true; } } } if(sent==false){ - LOG.warn("Failed to send "+node+" to dead letter queue"); + LOG.warn("Failed to send "+node+" to DLQ"); } }catch(Exception e){ - LOG.warn("Failed to pass expired message to dead letter queue",e); + LOG.warn("Caught an exception sending to DLQ: "+node,e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java index 1374930827..2bc61492cb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java @@ -41,7 +41,7 @@ public class TempQueueRegion extends AbstractRegion { protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; - return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory, null) { + return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index e52f7aa7cd..4d8c882fb0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -184,8 +184,8 @@ public class Topic extends BaseDestination implements Task{ } // Recover the durable subscription. - String clientId = subscription.getClientId(); - String subscriptionName = subscription.getSubscriptionName(); + String clientId = subscription.getSubscriptionKey().getClientId(); + String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); String selector = subscription.getConsumerInfo().getSelector(); SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); if (info != null) { @@ -435,7 +435,8 @@ public class Topic extends BaseDestination implements Task{ public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { if (topicStore != null && node.isPersistent()) { DurableTopicSubscription dsub = (DurableTopicSubscription)sub; - topicStore.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId()); + SubscriptionKey key = dsub.getSubscriptionKey(); + topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 5d4a6d478b..b80815f1ff 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -68,11 +68,10 @@ public class TopicSubscription extends AbstractSubscription { super(broker, context, info); this.usageManager = usageManager; String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; - Store tempDataStore = broker.getTempDataStore(); - if (tempDataStore != null) { - this.matched = new FilePendingMessageCursor(matchedName, tempDataStore); - } else { + if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) { this.matched = new VMPendingMessageCursor(); + } else { + this.matched = new FilePendingMessageCursor(broker,matchedName); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 7bcb9c1a2f..03575809d5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -21,6 +21,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.QueueMessageReference; @@ -32,6 +35,8 @@ import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * persist pending messages pending message (messages awaiting dispatch to a @@ -40,14 +45,14 @@ import org.apache.activemq.usage.UsageListener; * @version $Revision$ */ public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { - + private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class); private static final AtomicLong NAME_COUNT = new AtomicLong(); - + protected Broker broker; private Store store; private String name; private LinkedList memoryList = new LinkedList(); private ListContainer diskList; - private Iterator iter; + private Iterator iter; private Destination regionDestination; private boolean iterating; private boolean flushRequired; @@ -58,9 +63,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple * @param name * @param store */ - public FilePendingMessageCursor(String name, Store store) { + public FilePendingMessageCursor(Broker broker,String name) { + this.broker = broker; + this.store= broker.getTempDataStore(); this.name = NAME_COUNT.incrementAndGet() + "_" + name; - this.store = store; } public void start() throws Exception { @@ -157,19 +163,39 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple * @param node */ public synchronized void addMessageLast(MessageReference node) { - try { - regionDestination = node.getMessage().getRegionDestination(); - if (isSpaceInMemoryList()) { - memoryList.add(node); - node.incrementReferenceCount(); - } else { - flushToDisk(); - node.decrementReferenceCount(); + if (!node.isExpired()) { + try { + regionDestination = node.getMessage().getRegionDestination(); + if (isDiskListEmpty()) { + if (hasSpace()) { + memoryList.add(node); + node.incrementReferenceCount(); + return; + } + } + if (!hasSpace()) { + if (isDiskListEmpty()) { + expireOldMessages(); + if (hasSpace()) { + memoryList.add(node); + node.incrementReferenceCount(); + return; + } else { + flushToDisk(); + } + } + } systemUsage.getTempUsage().waitForSpace(); - getDiskList().addLast(node); + node.decrementReferenceCount(); + getDiskList().add(node); + + } catch (Exception e) { + LOG.error("Caught an Exception adding a message: " + node + + " first to FilePendingMessageCursor ", e); + throw new RuntimeException(e); } - } catch (Exception e) { - throw new RuntimeException(e); + } else { + discard(node); } } @@ -179,19 +205,39 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple * @param node */ public synchronized void addMessageFirst(MessageReference node) { - try { - regionDestination = node.getMessage().getRegionDestination(); - if (isSpaceInMemoryList()) { - memoryList.addFirst(node); - node.incrementReferenceCount(); - } else { - flushToDisk(); + if (!node.isExpired()) { + try { + regionDestination = node.getMessage().getRegionDestination(); + if (isDiskListEmpty()) { + if (hasSpace()) { + memoryList.addFirst(node); + node.incrementReferenceCount(); + return; + } + } + if (!hasSpace()) { + if (isDiskListEmpty()) { + expireOldMessages(); + if (hasSpace()) { + memoryList.addFirst(node); + node.incrementReferenceCount(); + return; + } else { + flushToDisk(); + } + } + } systemUsage.getTempUsage().waitForSpace(); node.decrementReferenceCount(); getDiskList().addFirst(node); + + } catch (Exception e) { + LOG.error("Caught an Exception adding a message: " + node + + " first to FilePendingMessageCursor ", e); + throw new RuntimeException(e); } - } catch (Exception e) { - throw new RuntimeException(e); + } else { + discard(node); } } @@ -271,13 +317,17 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple super.setSystemUsage(usageManager); } - public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { + public void onUsageChanged(Usage usage, int oldPercentUsage, + int newPercentUsage) { if (newPercentUsage >= getMemoryUsageHighWaterMark()) { synchronized (this) { flushRequired = true; if (!iterating) { - flushToDisk(); - flushRequired = false; + expireOldMessages(); + if (!hasSpace()) { + flushToDisk(); + flushRequired = false; + } } } } @@ -290,8 +340,25 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple protected boolean isSpaceInMemoryList() { return hasSpace() && isDiskListEmpty(); } + + protected synchronized void expireOldMessages() { + if (!memoryList.isEmpty()) { + LinkedList tmpList = new LinkedList(this.memoryList); + this.memoryList = new LinkedList(); + while (!tmpList.isEmpty()) { + MessageReference node = tmpList.removeFirst(); + if (node.isExpired()) { + discard(node); + }else { + memoryList.add(node); + } + } + } + + } protected synchronized void flushToDisk() { + if (!memoryList.isEmpty()) { while (!memoryList.isEmpty()) { MessageReference node = memoryList.removeFirst(); @@ -312,10 +379,18 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple diskList = store.getListContainer(name, "TopicSubscription", true); diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat())); } catch (IOException e) { - e.printStackTrace(); + LOG.error("Caught an IO Exception getting the DiskList ",e); throw new RuntimeException(e); } } return diskList; } + + protected void discard(MessageReference message) { + message.decrementReferenceCount(); + if (LOG.isDebugEnabled()) { + LOG.debug("Discarding message " + message); + } + broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(), message); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index 290dff236d..e0fcfbe2a1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -94,7 +94,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message } public synchronized void addMessageLast(MessageReference node) throws Exception { - if (cacheEnabled && !isFull()) { + if (cacheEnabled && hasSpace()) { //optimization - A persistent queue will add the message to //to store then retrieve it again from the store. recoverMessage(node.getMessage()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index f5d38d6bfb..8f2dc75208 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -23,13 +23,13 @@ import java.util.LinkedList; import java.util.Map; import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; -import org.apache.activemq.kaha.Store; import org.apache.activemq.usage.SystemUsage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,16 +53,19 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private final Subscription subscription; /** + * @param broker * @param topic * @param clientId * @param subscriberName + * @param maxBatchSize + * @param subscription * @throws IOException */ - public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize, Subscription subscription) { + public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) { this.clientId = clientId; this.subscriberName = subscriberName; this.subscription = subscription; - this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store); + this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName); storePrefetches.add(nonPersistent); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index cb1234550c..5521a648d2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region.cursors; import org.apache.activemq.ActiveMQMessageAudit; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; @@ -33,9 +34,9 @@ import org.apache.commons.logging.LogFactory; public class StoreQueueCursor extends AbstractPendingMessageCursor { private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class); + private Broker broker; private int pendingCount; private Queue queue; - private Store tmpStore; private PendingMessageCursor nonPersistent; private QueueStorePrefetch persistent; private boolean started; @@ -47,9 +48,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { * @param queue * @param tmpStore */ - public StoreQueueCursor(Queue queue, Store tmpStore) { + public StoreQueueCursor(Broker broker,Queue queue) { + this.broker=broker; this.queue = queue; - this.tmpStore = tmpStore; this.persistent = new QueueStorePrefetch(queue); currentCursor = persistent; } @@ -58,7 +59,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { started = true; super.start(); if (nonPersistent == null) { - nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore); + nonPersistent = new FilePendingMessageCursor(broker,queue.getName()); nonPersistent.setMaxBatchSize(getMaxBatchSize()); nonPersistent.setSystemUsage(systemUsage); nonPersistent.setEnableAudit(isEnableAudit()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java index 00a65febf5..235f948aa5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java @@ -16,10 +16,10 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Creates a PendIngMessageCursor for Durable subscribers * @@ -33,14 +33,15 @@ public class FilePendingDurableSubscriberMessageStoragePolicy implements Pending /** * Retrieve the configured pending message storage cursor; + * @param broker * * @param clientId * @param name - * @param tmpStorage * @param maxBatchSize + * @param sub * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { - return new FilePendingMessageCursor(name, tmpStorage); + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) { + return new FilePendingMessageCursor(broker,name); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java index cb906e4dda..f735435bfa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java @@ -16,10 +16,10 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Creates a FilePendingMessageCursor * @@ -32,14 +32,14 @@ import org.apache.activemq.kaha.Store; public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy { /** + * @param broker * @param queue - * @param tmpStore * @return the cursor * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue, * org.apache.activemq.kaha.Store) */ - public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) { - return new FilePendingMessageCursor("PendingCursor:" + queue.getName(), tmpStore); + public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) { + return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java index 2ee55377cc..91f9eecb61 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Creates a PendIngMessageCursor for Durable subscribers * @@ -31,15 +31,14 @@ import org.apache.activemq.kaha.Store; public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy { /** + * @param broker * @param name - * @param tmpStorage * @param maxBatchSize * @return a Cursor * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, * org.apache.activemq.kaha.Store, int) */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage, - int maxBatchSize) { - return new FilePendingMessageCursor("PendingCursor:" + name, tmpStorage); + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) { + return new FilePendingMessageCursor(broker,"PendingCursor:" + name); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java index 1c37bafbde..7c3b7ec39f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Abstraction to allow different policies for holding messages awaiting @@ -30,12 +30,13 @@ public interface PendingDurableSubscriberMessageStoragePolicy { /** * Retrieve the configured pending message storage cursor; + * @param broker * * @param clientId * @param name - * @param tmpStorage * @param maxBatchSize + * @param sub * @return the Pending Message cursor */ - PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub); + PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java index 0976462471..de3d67444c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Abstraction to allow different policies for holding messages awaiting @@ -30,10 +30,10 @@ public interface PendingQueueMessageStoragePolicy { /** * Retrieve the configured pending message storage cursor; + * @param broker * * @param queue - * @param tmpStore * @return the cursor */ - PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore); + PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java index d7a22dc865..537ec616e3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java @@ -16,8 +16,8 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Abstraction to allow different policies for holding messages awaiting @@ -29,11 +29,11 @@ public interface PendingSubscriberMessageStoragePolicy { /** * Retrieve the configured pending message storage cursor; + * @param broker * * @param name - * @param tmpStorage * @param maxBatchSize * @return the Pending Message cursor */ - PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage, int maxBatchSize); + PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 38a3279b8f..f41a6c97b5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -25,7 +25,6 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.filter.DestinationMapEntry; -import org.apache.activemq.kaha.Store; import org.apache.activemq.usage.SystemUsage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,7 +57,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean producerFlowControl = true; private boolean optimizedDispatch=false; - public void configure(Queue queue, Store tmpStore) { + public void configure(Broker broker,Queue queue) { if (dispatchPolicy != null) { queue.setDispatchPolicy(dispatchPolicy); } @@ -70,7 +69,7 @@ public class PolicyEntry extends DestinationMapEntry { queue.getMemoryUsage().setLimit(memoryLimit); } if (pendingQueuePolicy != null) { - PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore); + PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue); queue.setMessages(messages); } queue.setProducerFlowControl(isProducerFlowControl()); @@ -121,16 +120,16 @@ public class PolicyEntry extends DestinationMapEntry { if (pendingSubscriberPolicy != null) { String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); - subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name, broker.getTempDataStore(), maxBatchSize)); + subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize)); } } public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) { - String clientId = sub.getClientId(); - String subName = sub.getSubscriptionName(); + String clientId = sub.getSubscriptionKey().getClientId(); + String subName = sub.getSubscriptionKey().getSubscriptionName(); int prefetch = sub.getPrefetchSize(); if (pendingDurableSubscriberPolicy != null) { - PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch, sub); + PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,prefetch,sub); cursor.setSystemUsage(memoryManager); sub.setPending(cursor); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java index de7d651c8b..95eff5b94c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; @@ -34,14 +35,15 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin /** * Retrieve the configured pending message storage cursor; + * @param broker * * @param clientId * @param name - * @param tmpStorage * @param maxBatchSize + * @param sub * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { - return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize, sub); + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) { + return new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java index 9c39d8d3ae..e873ef883d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java @@ -16,10 +16,10 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreQueueCursor; -import org.apache.activemq.kaha.Store; /** * Creates a StoreQueueCursor * @@ -32,14 +32,14 @@ import org.apache.activemq.kaha.Store; public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy { /** + * @param broker * @param queue - * @param tmpStore * @return the cursor * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue, * org.apache.activemq.kaha.Store) */ - public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) { - return new StoreQueueCursor(queue, tmpStore); + public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) { + return new StoreQueueCursor(broker,queue); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java index c28306aa24..ebbe3f3ef6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java @@ -16,10 +16,10 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Creates a VMPendingMessageCursor * @@ -32,14 +32,14 @@ public class VMPendingDurableSubscriberMessageStoragePolicy implements PendingDu /** * Retrieve the configured pending message storage cursor; - * + * @param broker * @param clientId * @param name - * @param tmpStorage * @param maxBatchSize + * @param sub * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) { return new VMPendingMessageCursor(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java index ccfa06646a..ddd41ac3c1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java @@ -16,10 +16,10 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Creates a VMPendingMessageCursor * @@ -32,11 +32,11 @@ import org.apache.activemq.kaha.Store; public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy { /** + * @param broker * @param queue - * @param tmpStore * @return the cursor */ - public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) { + public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) { return new VMPendingMessageCursor(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java index 4eb01cfc88..db0c733bf7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; -import org.apache.activemq.kaha.Store; /** * Creates a VMPendingMessageCursor * @@ -31,15 +31,14 @@ import org.apache.activemq.kaha.Store; public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy { /** + * @param broker * @param name - * @param tmpStorage * @param maxBatchSize * @return a Cursor * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, * org.apache.activemq.kaha.Store, int) */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage, - int maxBatchSize) { + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) { return new VMPendingMessageCursor(); } }