git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@614206 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-22 14:28:10 +00:00
parent 17e22cff6f
commit 23cda2d6bb
28 changed files with 209 additions and 139 deletions

View File

@ -93,7 +93,7 @@ public class BrokerView implements BrokerViewMBean {
return broker.getDestinationStatistics().getMessagesCached().getCount();
}
public int getMemoryPercentageUsed() {
public int getMemoryPercentUsage() {
return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
}
@ -109,16 +109,16 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.getSystemUsage().getStoreUsage().getLimit();
}
public int getStorePercentageUsed() {
public int getStorePercentUsage() {
return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
}
public long getTmpLimit() {
public long getTempLimit() {
return brokerService.getSystemUsage().getTempUsage().getLimit();
}
public int getTmpPercentageUsed() {
public int getTempPercentUsage() {
return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
}
@ -126,7 +126,7 @@ public class BrokerView implements BrokerViewMBean {
brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
}
public void setTmpLimit(long limit) {
public void setTempLimit(long limit) {
brokerService.getSystemUsage().getTempUsage().setLimit(limit);
}
@ -172,7 +172,7 @@ public class BrokerView implements BrokerViewMBean {
}
public ObjectName[] getTopicSubscribers() {
return broker.getTemporaryTopicSubscribers();
return broker.getTopicSubscribers();
}
public ObjectName[] getDurableTopicSubscribers() {

View File

@ -61,23 +61,23 @@ public interface BrokerViewMBean extends Service {
long getTotalMessageCount();
int getMemoryPercentageUsed();
int getMemoryPercentUsage();
long getMemoryLimit();
void setMemoryLimit(long limit);
int getStorePercentageUsed();
int getStorePercentUsage();
long getStoreLimit();
void setStoreLimit(long limit);
int getTmpPercentageUsed();
int getTempPercentUsage();
long getTmpLimit();
long getTempLimit();
void setTmpLimit(long limit);
void setTempLimit(long limit);
boolean isPersistent();

View File

@ -93,7 +93,7 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getMessagesCached().getCount();
}
public int getMemoryPercentageUsed() {
public int getMemoryPercentUsage() {
return destination.getMemoryUsage().getPercentUsage();
}
@ -294,7 +294,7 @@ public class DestinationView implements DestinationViewMBean {
}
public float getMemoryLimitPortion() {
public float getMemoryUsagePortion() {
return destination.getMemoryUsage().getUsagePortion();
}
@ -306,7 +306,7 @@ public class DestinationView implements DestinationViewMBean {
return destination.isProducerFlowControl();
}
public void setMemoryLimitPortion(float value) {
public void setMemoryUsagePortion(float value) {
destination.getMemoryUsage().setUsagePortion(value);
}

View File

@ -127,7 +127,7 @@ public interface DestinationViewMBean {
/**
* @return the percentage of amount of memory used
*/
int getMemoryPercentageUsed();
int getMemoryPercentUsage();
/**
* @return the amount of memory allocated to this destination
@ -143,13 +143,13 @@ public interface DestinationViewMBean {
/**
* @return the portion of memory from the broker memory limit for this destination
*/
float getMemoryLimitPortion();
float getMemoryUsagePortion();
/**
* set the portion of memory from the broker memory limit for this destination
* @param value
*/
void setMemoryLimitPortion(float value);
void setMemoryUsagePortion(float value);
/**
* Browses the current destination returning a list of messages

View File

@ -139,10 +139,7 @@ public abstract class BaseDestination implements Destination {
return destination;
}
public final String getDestination() {
return destination.getPhysicalName();
}
public final String getName() {
return getActiveMQDestination().getPhysicalName();
}

View File

@ -76,7 +76,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
if (destination.isQueue()) {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory, broker.getTempDataStore()) {
return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume
@ -90,7 +90,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
};
} else {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory, broker.getTempDataStore());
Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
queue.initialize();
return queue;
@ -127,7 +127,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(queue, broker.getTempDataStore());
entry.configure(broker,queue);
}
}
}

View File

@ -52,7 +52,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
super(broker,usageManager, context, info);
this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
this.pending.setSystemUsage(usageManager);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
@ -218,19 +218,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
node.decrementReferenceCount();
}
public String getSubscriptionName() {
return subscriptionKey.getSubscriptionName();
}
public synchronized String toString() {
return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
+ getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
}
public String getClientId() {
return subscriptionKey.getClientId();
}
public SubscriptionKey getSubscriptionKey() {
return subscriptionKey;
}

View File

@ -95,14 +95,14 @@ public class Queue extends BaseDestination implements Task {
};
};
public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
super(broker, store, destination,systemUsage, parentStats);
if (destination.isTemporary() || tmpStore==null ) {
if (destination.isTemporary() || broker == null || store==null ) {
this.messages = new VMPendingMessageCursor();
} else {
this.messages = new StoreQueueCursor(this, tmpStore);
this.messages = new StoreQueueCursor(broker,this);
}
this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName());
@ -318,11 +318,11 @@ public class Queue extends BaseDestination implements Task {
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
message.setRegionDestination(this);
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
if (message.isExpired()) {
broker.messageExpired(context, message);
broker.getRoot().messageExpired(context, message);
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
if (sendProducerAck) {
@ -402,6 +402,7 @@ public class Queue extends BaseDestination implements Task {
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
broker.getRoot().messageExpired(context, message);
return;
}
}
@ -416,7 +417,6 @@ public class Queue extends BaseDestination implements Task {
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
synchronized (sendLock) {
message.setRegionDestination(this);
if (store != null && message.isPersistent()) {
while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
if (context.getStopping().get()) {
@ -678,11 +678,7 @@ public class Queue extends BaseDestination implements Task {
// We should only delete messages that can be locked.
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);
ack.setMessageID(r.getMessageId());
removeMessage(c, null, r, ack);
removeMessage(c,(IndirectMessageReference) r);
}
} catch (IOException e) {
}

View File

@ -705,14 +705,18 @@ public class RegionBroker implements Broker {
deadLetterDestination);
sent=true;
}
}else {
//don't want to warn about failing to send
// if there isn't a dead letter strategy
sent=true;
}
}
}
if(sent==false){
LOG.warn("Failed to send "+node+" to dead letter queue");
LOG.warn("Failed to send "+node+" to DLQ");
}
}catch(Exception e){
LOG.warn("Failed to pass expired message to dead letter queue",e);
LOG.warn("Caught an exception sending to DLQ: "+node,e);
}
}

View File

@ -41,7 +41,7 @@ public class TempQueueRegion extends AbstractRegion {
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory, null) {
return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {

View File

@ -184,8 +184,8 @@ public class Topic extends BaseDestination implements Task{
}
// Recover the durable subscription.
String clientId = subscription.getClientId();
String subscriptionName = subscription.getSubscriptionName();
String clientId = subscription.getSubscriptionKey().getClientId();
String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
String selector = subscription.getConsumerInfo().getSelector();
SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
if (info != null) {
@ -435,7 +435,8 @@ public class Topic extends BaseDestination implements Task{
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
topicStore.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId());
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
}
}

View File

@ -68,11 +68,10 @@ public class TopicSubscription extends AbstractSubscription {
super(broker, context, info);
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
Store tempDataStore = broker.getTempDataStore();
if (tempDataStore != null) {
this.matched = new FilePendingMessageCursor(matchedName, tempDataStore);
} else {
if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
this.matched = new VMPendingMessageCursor();
} else {
this.matched = new FilePendingMessageCursor(broker,matchedName);
}
}

View File

@ -21,6 +21,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
@ -32,6 +35,8 @@ import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* persist pending messages pending message (messages awaiting dispatch to a
@ -40,14 +45,14 @@ import org.apache.activemq.usage.UsageListener;
* @version $Revision$
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
private static final AtomicLong NAME_COUNT = new AtomicLong();
protected Broker broker;
private Store store;
private String name;
private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
private ListContainer<MessageReference> diskList;
private Iterator iter;
private Iterator<MessageReference> iter;
private Destination regionDestination;
private boolean iterating;
private boolean flushRequired;
@ -58,9 +63,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param name
* @param store
*/
public FilePendingMessageCursor(String name, Store store) {
public FilePendingMessageCursor(Broker broker,String name) {
this.broker = broker;
this.store= broker.getTempDataStore();
this.name = NAME_COUNT.incrementAndGet() + "_" + name;
this.store = store;
}
public void start() throws Exception {
@ -157,19 +163,39 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param node
*/
public synchronized void addMessageLast(MessageReference node) {
try {
regionDestination = node.getMessage().getRegionDestination();
if (isSpaceInMemoryList()) {
memoryList.add(node);
node.incrementReferenceCount();
} else {
flushToDisk();
node.decrementReferenceCount();
if (!node.isExpired()) {
try {
regionDestination = node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
if (hasSpace()) {
memoryList.add(node);
node.incrementReferenceCount();
return;
}
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.add(node);
node.incrementReferenceCount();
return;
} else {
flushToDisk();
}
}
}
systemUsage.getTempUsage().waitForSpace();
getDiskList().addLast(node);
node.decrementReferenceCount();
getDiskList().add(node);
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
+ " first to FilePendingMessageCursor ", e);
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
} else {
discard(node);
}
}
@ -179,19 +205,39 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param node
*/
public synchronized void addMessageFirst(MessageReference node) {
try {
regionDestination = node.getMessage().getRegionDestination();
if (isSpaceInMemoryList()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
} else {
flushToDisk();
if (!node.isExpired()) {
try {
regionDestination = node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
}
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
} else {
flushToDisk();
}
}
}
systemUsage.getTempUsage().waitForSpace();
node.decrementReferenceCount();
getDiskList().addFirst(node);
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
+ " first to FilePendingMessageCursor ", e);
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
} else {
discard(node);
}
}
@ -271,13 +317,17 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
super.setSystemUsage(usageManager);
}
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
public void onUsageChanged(Usage usage, int oldPercentUsage,
int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) {
flushRequired = true;
if (!iterating) {
flushToDisk();
flushRequired = false;
expireOldMessages();
if (!hasSpace()) {
flushToDisk();
flushRequired = false;
}
}
}
}
@ -290,8 +340,25 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
protected boolean isSpaceInMemoryList() {
return hasSpace() && isDiskListEmpty();
}
protected synchronized void expireOldMessages() {
if (!memoryList.isEmpty()) {
LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
this.memoryList = new LinkedList<MessageReference>();
while (!tmpList.isEmpty()) {
MessageReference node = tmpList.removeFirst();
if (node.isExpired()) {
discard(node);
}else {
memoryList.add(node);
}
}
}
}
protected synchronized void flushToDisk() {
if (!memoryList.isEmpty()) {
while (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst();
@ -312,10 +379,18 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
diskList = store.getListContainer(name, "TopicSubscription", true);
diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
} catch (IOException e) {
e.printStackTrace();
LOG.error("Caught an IO Exception getting the DiskList ",e);
throw new RuntimeException(e);
}
}
return diskList;
}
protected void discard(MessageReference message) {
message.decrementReferenceCount();
if (LOG.isDebugEnabled()) {
LOG.debug("Discarding message " + message);
}
broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(), message);
}
}

View File

@ -94,7 +94,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
}
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (cacheEnabled && !isFull()) {
if (cacheEnabled && hasSpace()) {
//optimization - A persistent queue will add the message to
//to store then retrieve it again from the store.
recoverMessage(node.getMessage());

View File

@ -23,13 +23,13 @@ import java.util.LinkedList;
import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -53,16 +53,19 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private final Subscription subscription;
/**
* @param broker
* @param topic
* @param clientId
* @param subscriberName
* @param maxBatchSize
* @param subscription
* @throws IOException
*/
public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize, Subscription subscription) {
public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
this.clientId = clientId;
this.subscriberName = subscriberName;
this.subscription = subscription;
this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store);
this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
storePrefetches.add(nonPersistent);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
@ -33,9 +34,9 @@ import org.apache.commons.logging.LogFactory;
public class StoreQueueCursor extends AbstractPendingMessageCursor {
private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
private Broker broker;
private int pendingCount;
private Queue queue;
private Store tmpStore;
private PendingMessageCursor nonPersistent;
private QueueStorePrefetch persistent;
private boolean started;
@ -47,9 +48,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
* @param queue
* @param tmpStore
*/
public StoreQueueCursor(Queue queue, Store tmpStore) {
public StoreQueueCursor(Broker broker,Queue queue) {
this.broker=broker;
this.queue = queue;
this.tmpStore = tmpStore;
this.persistent = new QueueStorePrefetch(queue);
currentCursor = persistent;
}
@ -58,7 +59,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
started = true;
super.start();
if (nonPersistent == null) {
nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore);
nonPersistent = new FilePendingMessageCursor(broker,queue.getName());
nonPersistent.setMaxBatchSize(getMaxBatchSize());
nonPersistent.setSystemUsage(systemUsage);
nonPersistent.setEnableAudit(isEnableAudit());

View File

@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a PendIngMessageCursor for Durable subscribers *
@ -33,14 +33,15 @@ public class FilePendingDurableSubscriberMessageStoragePolicy implements Pending
/**
* Retrieve the configured pending message storage cursor;
* @param broker
*
* @param clientId
* @param name
* @param tmpStorage
* @param maxBatchSize
* @param sub
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
return new FilePendingMessageCursor(name, tmpStorage);
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
return new FilePendingMessageCursor(broker,name);
}
}

View File

@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a FilePendingMessageCursor *
@ -32,14 +32,14 @@ import org.apache.activemq.kaha.Store;
public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
* @param broker
* @param queue
* @param tmpStore
* @return the cursor
* @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
* org.apache.activemq.kaha.Store)
*/
public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
return new FilePendingMessageCursor("PendingCursor:" + queue.getName(), tmpStore);
public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName());
}
}

View File

@ -16,9 +16,9 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a PendIngMessageCursor for Durable subscribers *
@ -31,15 +31,14 @@ import org.apache.activemq.kaha.Store;
public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
/**
* @param broker
* @param name
* @param tmpStorage
* @param maxBatchSize
* @return a Cursor
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
* org.apache.activemq.kaha.Store, int)
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage,
int maxBatchSize) {
return new FilePendingMessageCursor("PendingCursor:" + name, tmpStorage);
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
return new FilePendingMessageCursor(broker,"PendingCursor:" + name);
}
}

View File

@ -16,9 +16,9 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Abstraction to allow different policies for holding messages awaiting
@ -30,12 +30,13 @@ public interface PendingDurableSubscriberMessageStoragePolicy {
/**
* Retrieve the configured pending message storage cursor;
* @param broker
*
* @param clientId
* @param name
* @param tmpStorage
* @param maxBatchSize
* @param sub
* @return the Pending Message cursor
*/
PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub);
PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub);
}

View File

@ -16,9 +16,9 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Abstraction to allow different policies for holding messages awaiting
@ -30,10 +30,10 @@ public interface PendingQueueMessageStoragePolicy {
/**
* Retrieve the configured pending message storage cursor;
* @param broker
*
* @param queue
* @param tmpStore
* @return the cursor
*/
PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore);
PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue);
}

View File

@ -16,8 +16,8 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Abstraction to allow different policies for holding messages awaiting
@ -29,11 +29,11 @@ public interface PendingSubscriberMessageStoragePolicy {
/**
* Retrieve the configured pending message storage cursor;
* @param broker
*
* @param name
* @param tmpStorage
* @param maxBatchSize
* @return the Pending Message cursor
*/
PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage, int maxBatchSize);
PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize);
}

View File

@ -25,7 +25,6 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -58,7 +57,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
public void configure(Queue queue, Store tmpStore) {
public void configure(Broker broker,Queue queue) {
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
}
@ -70,7 +69,7 @@ public class PolicyEntry extends DestinationMapEntry {
queue.getMemoryUsage().setLimit(memoryLimit);
}
if (pendingQueuePolicy != null) {
PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore);
PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
queue.setMessages(messages);
}
queue.setProducerFlowControl(isProducerFlowControl());
@ -121,16 +120,16 @@ public class PolicyEntry extends DestinationMapEntry {
if (pendingSubscriberPolicy != null) {
String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name, broker.getTempDataStore(), maxBatchSize));
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
}
}
public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
String clientId = sub.getClientId();
String subName = sub.getSubscriptionName();
String clientId = sub.getSubscriptionKey().getClientId();
String subName = sub.getSubscriptionKey().getSubscriptionName();
int prefetch = sub.getPrefetchSize();
if (pendingDurableSubscriberPolicy != null) {
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch, sub);
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,prefetch,sub);
cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@ -34,14 +35,15 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin
/**
* Retrieve the configured pending message storage cursor;
* @param broker
*
* @param clientId
* @param name
* @param tmpStorage
* @param maxBatchSize
* @param sub
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize, sub);
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
return new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
}
}

View File

@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a StoreQueueCursor *
@ -32,14 +32,14 @@ import org.apache.activemq.kaha.Store;
public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
* @param broker
* @param queue
* @param tmpStore
* @return the cursor
* @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
* org.apache.activemq.kaha.Store)
*/
public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
return new StoreQueueCursor(queue, tmpStore);
public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
return new StoreQueueCursor(broker,queue);
}
}

View File

@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor *
@ -32,14 +32,14 @@ public class VMPendingDurableSubscriberMessageStoragePolicy implements PendingDu
/**
* Retrieve the configured pending message storage cursor;
*
* @param broker
* @param clientId
* @param name
* @param tmpStorage
* @param maxBatchSize
* @param sub
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) {
return new VMPendingMessageCursor();
}
}

View File

@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor *
@ -32,11 +32,11 @@ import org.apache.activemq.kaha.Store;
public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
* @param broker
* @param queue
* @param tmpStore
* @return the cursor
*/
public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
return new VMPendingMessageCursor();
}
}

View File

@ -16,9 +16,9 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor *
@ -31,15 +31,14 @@ import org.apache.activemq.kaha.Store;
public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
/**
* @param broker
* @param name
* @param tmpStorage
* @param maxBatchSize
* @return a Cursor
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
* org.apache.activemq.kaha.Store, int)
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage,
int maxBatchSize) {
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
return new VMPendingMessageCursor();
}
}