git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@818147 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2009-09-23 15:54:37 +00:00
parent 5efe421cdc
commit 785454a366
14 changed files with 99 additions and 4 deletions

View File

@ -47,6 +47,7 @@ public abstract class AbstractSubscription implements Subscription {
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
private BooleanExpression selectorExpression;
private ObjectName objectName;
private int cursorMemoryHighWaterMark = 70;
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
@ -211,6 +212,14 @@ public abstract class AbstractSubscription implements Subscription {
}
public int getCursorMemoryHighWaterMark(){
return this.cursorMemoryHighWaterMark;
}
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
}
public int countBeforeFull() {
return getDispatchedQueueSize() - info.getPrefetchSize();
}

View File

@ -50,7 +50,7 @@ public abstract class BaseDestination implements Destination {
protected final MessageStore store;
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = false;
private boolean producerFlowControl = true;
protected boolean warnOnProducerFlowControl = true;
private int maxProducersToAudit = 1024;
private int maxAuditDepth = 2048;
@ -73,6 +73,7 @@ public abstract class BaseDestination implements Destination {
protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
protected int cursorMemoryHighWaterMark = 70;
/**
* @param broker
@ -376,6 +377,14 @@ public abstract class BaseDestination implements Destination {
this.deadLetterStrategy = deadLetterStrategy;
}
public int getCursorMemoryHighWaterMark() {
return this.cursorMemoryHighWaterMark;
}
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
}
/**
* called when message is consumed
*
@ -511,5 +520,4 @@ public abstract class BaseDestination implements Destination {
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws Exception {
}
}

View File

@ -105,6 +105,10 @@ public interface Destination extends Service, Task {
public void setMinimumMessageSize(int minimumMessageSize);
public int getCursorMemoryHighWaterMark();
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
/**
* optionally called by a Subscriber - to inform the Destination its
* ready for more messages

View File

@ -265,4 +265,12 @@ public class DestinationFilter implements Destination {
MessageDispatchNotification messageDispatchNotification) throws Exception {
next.processDispatchNotification(messageDispatchNotification);
}
public int getCursorMemoryHighWaterMark() {
return next.getCursorMemoryHighWaterMark();
}
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
}
}

View File

@ -54,6 +54,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
super(broker,usageManager, context, info);
this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
this.pending.setSystemUsage(usageManager);
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
@ -115,6 +116,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
synchronized (pending) {
pending.setSystemUsage(memoryManager);
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
pending.start();
// If nothing was in the persistent store, then try to use the

View File

@ -525,6 +525,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
this.pending = pending;
if (this.pending!=null) {
this.pending.setSystemUsage(usageManager);
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
}
}

View File

@ -191,6 +191,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
messages.setMaxAuditDepth(getMaxAuditDepth());
messages.setMaxProducersToAudit(getMaxProducersToAudit());
messages.setUseCache(isUseCache());
messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
if (messages.isRecoveryRequired()) {
store.recover(new MessageRecoveryListener() {

View File

@ -223,4 +223,8 @@ public interface Subscription extends SubscriptionRecovery {
int countBeforeFull();
ConnectionContext getContext();
public int getCursorMemoryHighWaterMark();
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
}

View File

@ -52,6 +52,7 @@ public class TempQueue extends Queue{
public void initialize() throws Exception {
this.messages=new VMPendingMessageCursor();
this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());

View File

@ -76,6 +76,7 @@ public class TopicSubscription extends AbstractSubscription {
public void init() throws Exception {
this.matched.setSystemUsage(usageManager);
this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.matched.start();
}

View File

@ -250,6 +250,16 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
if (persistent != null) {
persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
}
if (nonPersistent != null) {
nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
}
}
public synchronized void gc() {

View File

@ -33,6 +33,7 @@ import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.impl.sql.compile.GetCurrentConnectionNode;
/**
* Represents an entry in a {@link PolicyMap} for assigning policies to a
@ -59,7 +60,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int maxAuditDepth=2048;
private int maxQueueAuditDepth=2048;
private boolean enableAudit=true;
private boolean producerFlowControl = false;
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
@ -82,6 +83,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
private int cursorMemoryHighWaterMark=70;
public void configure(Broker broker,Queue queue) {
@ -140,6 +142,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
destination.setMaxExpirePageSize(getMaxExpirePageSize());
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@ -177,8 +180,8 @@ public class PolicyEntry extends DestinationMapEntry {
String clientId = sub.getSubscriptionKey().getClientId();
String subName = sub.getSubscriptionKey().getSubscriptionName();
int prefetch = sub.getPrefetchSize();
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
//override prefetch size if not set by the Consumer
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
sub.setPrefetchSize(getDurableTopicPrefetch());
}
@ -189,6 +192,7 @@ public class PolicyEntry extends DestinationMapEntry {
}
sub.setMaxAuditDepth(getMaxAuditDepth());
sub.setMaxProducersToAudit(getMaxProducersToAudit());
}
public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
@ -199,6 +203,7 @@ public class PolicyEntry extends DestinationMapEntry {
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){
sub.setPrefetchSize(getQueueBrowserPrefetch());
}
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
}
public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
@ -209,6 +214,7 @@ public class PolicyEntry extends DestinationMapEntry {
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
sub.setPrefetchSize(getQueuePrefetch());
}
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
}
// Properties
@ -662,6 +668,14 @@ public class PolicyEntry extends DestinationMapEntry {
this.durableTopicPrefetch = durableTopicPrefetch;
}
public int getCursorMemoryHighWaterMark() {
return this.cursorMemoryHighWaterMark;
}
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import junit.framework.TestCase;
public class PolicyConfigTest extends TestCase{
}

View File

@ -285,6 +285,14 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
public void acknowledge(ConnectionContext context, MessageAck ack)
throws Exception {
}
public int getCursorMemoryHighWaterMark(){
return 0;
}
public void setCursorMemoryHighWaterMark(
int cursorMemoryHighWaterMark) {
}
};
queue.addSubscription(contextNotInTx, subscription);