mirror of https://github.com/apache/activemq.git
Merged /activemq/trunk:r818140-818147,818160-818176,818225-818262
git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@818418 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
88d5c9c296
commit
70144c9fe9
|
@ -47,6 +47,7 @@ public abstract class AbstractSubscription implements Subscription {
|
|||
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
|
||||
private BooleanExpression selectorExpression;
|
||||
private ObjectName objectName;
|
||||
private int cursorMemoryHighWaterMark = 70;
|
||||
|
||||
|
||||
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
|
||||
|
@ -211,6 +212,14 @@ public abstract class AbstractSubscription implements Subscription {
|
|||
|
||||
}
|
||||
|
||||
public int getCursorMemoryHighWaterMark(){
|
||||
return this.cursorMemoryHighWaterMark;
|
||||
}
|
||||
|
||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
|
||||
this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
|
||||
}
|
||||
|
||||
public int countBeforeFull() {
|
||||
return getDispatchedQueueSize() - info.getPrefetchSize();
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ public abstract class BaseDestination implements Destination {
|
|||
protected final MessageStore store;
|
||||
protected SystemUsage systemUsage;
|
||||
protected MemoryUsage memoryUsage;
|
||||
private boolean producerFlowControl = false;
|
||||
private boolean producerFlowControl = true;
|
||||
private int maxProducersToAudit = 1024;
|
||||
private int maxAuditDepth = 2048;
|
||||
private boolean enableAudit = true;
|
||||
|
@ -72,6 +72,7 @@ public abstract class BaseDestination implements Destination {
|
|||
protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
|
||||
protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
|
||||
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
|
||||
protected int cursorMemoryHighWaterMark = 70;
|
||||
|
||||
/**
|
||||
* @param broker
|
||||
|
@ -374,6 +375,14 @@ public abstract class BaseDestination implements Destination {
|
|||
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
|
||||
this.deadLetterStrategy = deadLetterStrategy;
|
||||
}
|
||||
|
||||
public int getCursorMemoryHighWaterMark() {
|
||||
return this.cursorMemoryHighWaterMark;
|
||||
}
|
||||
|
||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
|
||||
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
|
||||
}
|
||||
|
||||
/**
|
||||
* called when message is consumed
|
||||
|
@ -510,5 +519,4 @@ public abstract class BaseDestination implements Destination {
|
|||
public void processDispatchNotification(
|
||||
MessageDispatchNotification messageDispatchNotification) throws Exception {
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -105,6 +105,10 @@ public interface Destination extends Service, Task {
|
|||
|
||||
public void setMinimumMessageSize(int minimumMessageSize);
|
||||
|
||||
public int getCursorMemoryHighWaterMark();
|
||||
|
||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
|
||||
|
||||
/**
|
||||
* optionally called by a Subscriber - to inform the Destination its
|
||||
* ready for more messages
|
||||
|
|
|
@ -265,4 +265,12 @@ public class DestinationFilter implements Destination {
|
|||
MessageDispatchNotification messageDispatchNotification) throws Exception {
|
||||
next.processDispatchNotification(messageDispatchNotification);
|
||||
}
|
||||
|
||||
public int getCursorMemoryHighWaterMark() {
|
||||
return next.getCursorMemoryHighWaterMark();
|
||||
}
|
||||
|
||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
|
||||
next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
super(broker,usageManager, context, info);
|
||||
this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
|
||||
this.pending.setSystemUsage(usageManager);
|
||||
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
this.keepDurableSubsActive = keepDurableSubsActive;
|
||||
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
|
||||
|
||||
|
@ -115,6 +116,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
}
|
||||
synchronized (pending) {
|
||||
pending.setSystemUsage(memoryManager);
|
||||
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
pending.start();
|
||||
|
||||
// If nothing was in the persistent store, then try to use the
|
||||
|
|
|
@ -525,6 +525,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
this.pending = pending;
|
||||
if (this.pending!=null) {
|
||||
this.pending.setSystemUsage(usageManager);
|
||||
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -191,6 +191,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
messages.setMaxAuditDepth(getMaxAuditDepth());
|
||||
messages.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
messages.setUseCache(isUseCache());
|
||||
messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
if (messages.isRecoveryRequired()) {
|
||||
store.recover(new MessageRecoveryListener() {
|
||||
|
||||
|
|
|
@ -223,4 +223,8 @@ public interface Subscription extends SubscriptionRecovery {
|
|||
int countBeforeFull();
|
||||
|
||||
ConnectionContext getContext();
|
||||
|
||||
public int getCursorMemoryHighWaterMark();
|
||||
|
||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ public class TempQueue extends Queue{
|
|||
|
||||
public void initialize() throws Exception {
|
||||
this.messages=new VMPendingMessageCursor();
|
||||
this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
this.systemUsage = brokerService.getSystemUsage();
|
||||
memoryUsage.setParent(systemUsage.getMemoryUsage());
|
||||
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());
|
||||
|
|
|
@ -76,6 +76,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
|
||||
public void init() throws Exception {
|
||||
this.matched.setSystemUsage(usageManager);
|
||||
this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
this.matched.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -249,6 +249,16 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
nonPersistent.setUseCache(useCache);
|
||||
}
|
||||
}
|
||||
|
||||
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
|
||||
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
|
||||
if (persistent != null) {
|
||||
persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
|
||||
}
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private int maxAuditDepth=2048;
|
||||
private int maxQueueAuditDepth=2048;
|
||||
private boolean enableAudit=true;
|
||||
private boolean producerFlowControl = false;
|
||||
private boolean producerFlowControl = true;
|
||||
private boolean optimizedDispatch=false;
|
||||
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
|
||||
private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
|
||||
|
@ -82,6 +82,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
|
||||
private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
|
||||
private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
|
||||
private int cursorMemoryHighWaterMark=70;
|
||||
|
||||
|
||||
public void configure(Broker broker,Queue queue) {
|
||||
|
@ -140,6 +141,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
|
||||
destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
|
||||
destination.setMaxExpirePageSize(getMaxExpirePageSize());
|
||||
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
}
|
||||
|
||||
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
||||
|
@ -177,8 +179,8 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
String clientId = sub.getSubscriptionKey().getClientId();
|
||||
String subName = sub.getSubscriptionKey().getSubscriptionName();
|
||||
int prefetch = sub.getPrefetchSize();
|
||||
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
//override prefetch size if not set by the Consumer
|
||||
|
||||
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
|
||||
sub.setPrefetchSize(getDurableTopicPrefetch());
|
||||
}
|
||||
|
@ -189,6 +191,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
}
|
||||
sub.setMaxAuditDepth(getMaxAuditDepth());
|
||||
sub.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
|
||||
}
|
||||
|
||||
public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
|
||||
|
@ -199,6 +202,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){
|
||||
sub.setPrefetchSize(getQueueBrowserPrefetch());
|
||||
}
|
||||
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
}
|
||||
|
||||
public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
|
||||
|
@ -209,6 +213,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
|
||||
sub.setPrefetchSize(getQueuePrefetch());
|
||||
}
|
||||
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
}
|
||||
|
||||
// Properties
|
||||
|
@ -661,6 +666,14 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setDurableTopicPrefetch(int durableTopicPrefetch) {
|
||||
this.durableTopicPrefetch = durableTopicPrefetch;
|
||||
}
|
||||
|
||||
public int getCursorMemoryHighWaterMark() {
|
||||
return this.cursorMemoryHighWaterMark;
|
||||
}
|
||||
|
||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
|
||||
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
public class PolicyConfigTest extends TestCase{
|
||||
|
||||
public void testNoop() {
|
||||
}
|
||||
|
||||
}
|
|
@ -285,6 +285,14 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
|
|||
public void acknowledge(ConnectionContext context, MessageAck ack)
|
||||
throws Exception {
|
||||
}
|
||||
|
||||
public int getCursorMemoryHighWaterMark(){
|
||||
return 0;
|
||||
}
|
||||
|
||||
public void setCursorMemoryHighWaterMark(
|
||||
int cursorMemoryHighWaterMark) {
|
||||
}
|
||||
};
|
||||
|
||||
queue.addSubscription(contextNotInTx, subscription);
|
||||
|
|
Loading…
Reference in New Issue