mirror of https://github.com/apache/activemq.git
Applied patch in http://jira.activemq.org/jira/browse/AMQ-633
Thanks Brian Diesenhaus! Also exposed the memory limits and usage via JMX and allowed the limit to be configured via a destination policy. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386199 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8d589ec6fa
commit
36722d8774
|
@ -73,6 +73,16 @@ public class DestinationView {
|
|||
return destination.getDestinationStatistics().getMessagesCached().getCount();
|
||||
}
|
||||
|
||||
public int getMemoryPercentageUsed() {
|
||||
return destination.getUsageManager().getPercentUsage();
|
||||
}
|
||||
public long getMemoryLimit() {
|
||||
return destination.getUsageManager().getLimit();
|
||||
}
|
||||
public void setMemoryLimit(long limit) {
|
||||
destination.getUsageManager().setLimit(limit);
|
||||
}
|
||||
|
||||
public CompositeData[] browse() throws OpenDataException{
|
||||
Message[] messages=destination.browse();
|
||||
CompositeData c[]=new CompositeData[messages.length];
|
||||
|
|
|
@ -76,4 +76,9 @@ public interface DestinationViewMBean {
|
|||
* @throws Exception
|
||||
*/
|
||||
public String sendTextMessage(Map headers, String body) throws Exception;
|
||||
|
||||
public int getMemoryPercentageUsed();
|
||||
public long getMemoryLimit();
|
||||
public void setMemoryLimit(long limit);
|
||||
|
||||
}
|
|
@ -79,7 +79,9 @@ public class Queue implements Destination {
|
|||
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store,
|
||||
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
|
||||
this.destination = destination;
|
||||
this.usageManager = memoryManager;
|
||||
this.usageManager = new UsageManager(memoryManager);
|
||||
this.usageManager.setLimit(Long.MAX_VALUE);
|
||||
|
||||
this.store = store;
|
||||
|
||||
destinationStatistics.setParent(parentStats);
|
||||
|
|
|
@ -73,7 +73,8 @@ public class Topic implements Destination {
|
|||
|
||||
this.destination = destination;
|
||||
this.store = store;
|
||||
this.usageManager = memoryManager;
|
||||
this.usageManager = new UsageManager(memoryManager);
|
||||
this.usageManager.setLimit(Long.MAX_VALUE);
|
||||
this.destinationStatistics.setParent(parentStats);
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private int messageGroupHashBucketCount = 1024;
|
||||
private PendingMessageLimitStrategy pendingMessageLimitStrategy;
|
||||
private MessageEvictionStrategy messageEvictionStrategy;
|
||||
private long memoryLimit;
|
||||
|
||||
public void configure(Queue queue) {
|
||||
if (dispatchPolicy != null) {
|
||||
|
@ -51,6 +52,9 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
queue.setDeadLetterStrategy(deadLetterStrategy);
|
||||
}
|
||||
queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount);
|
||||
if( memoryLimit>0 ) {
|
||||
queue.getUsageManager().setLimit(memoryLimit);
|
||||
}
|
||||
}
|
||||
|
||||
public void configure(Topic topic) {
|
||||
|
@ -64,6 +68,9 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy);
|
||||
}
|
||||
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
|
||||
if( memoryLimit>0 ) {
|
||||
topic.getUsageManager().setLimit(memoryLimit);
|
||||
}
|
||||
}
|
||||
|
||||
public void configure(TopicSubscription subscription) {
|
||||
|
@ -173,4 +180,12 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
this.messageEvictionStrategy = messageEvictionStrategy;
|
||||
}
|
||||
|
||||
public long getMemoryLimit() {
|
||||
return memoryLimit;
|
||||
}
|
||||
|
||||
public void setMemoryLimit(long memoryLimit) {
|
||||
this.memoryLimit = memoryLimit;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ public class StompTest extends CombinationTestSupport {
|
|||
broker.start();
|
||||
|
||||
URI connectUri = connector.getConnectUri();
|
||||
stompSocket = new Socket(connectUri.getHost(), connectUri.getPort());
|
||||
stompSocket = new Socket("127.0.0.1", connectUri.getPort());
|
||||
inputBuffer = new ByteArrayOutputStream();
|
||||
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
|
||||
|
|
Loading…
Reference in New Issue