diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 05497c9a82..922a64d7a1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -73,6 +73,16 @@ public class DestinationView { return destination.getDestinationStatistics().getMessagesCached().getCount(); } + public int getMemoryPercentageUsed() { + return destination.getUsageManager().getPercentUsage(); + } + public long getMemoryLimit() { + return destination.getUsageManager().getLimit(); + } + public void setMemoryLimit(long limit) { + destination.getUsageManager().setLimit(limit); + } + public CompositeData[] browse() throws OpenDataException{ Message[] messages=destination.browse(); CompositeData c[]=new CompositeData[messages.length]; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 83f7a81c81..d3a44e998a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -76,4 +76,9 @@ public interface DestinationViewMBean { * @throws Exception */ public String sendTextMessage(Map headers, String body) throws Exception; + + public int getMemoryPercentageUsed(); + public long getMemoryLimit(); + public void setMemoryLimit(long limit); + } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 6f0eefd490..b8548bb55d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -79,7 +79,9 @@ public class Queue implements Destination { public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { this.destination = destination; - this.usageManager = memoryManager; + this.usageManager = new UsageManager(memoryManager); + this.usageManager.setLimit(Long.MAX_VALUE); + this.store = store; destinationStatistics.setParent(parentStats); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 90c8e732b3..82ee44d428 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -73,7 +73,8 @@ public class Topic implements Destination { this.destination = destination; this.store = store; - this.usageManager = memoryManager; + this.usageManager = new UsageManager(memoryManager); + this.usageManager.setLimit(Long.MAX_VALUE); this.destinationStatistics.setParent(parentStats); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 64654b17b1..eb3af88c4d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -42,6 +42,7 @@ public class PolicyEntry extends DestinationMapEntry { private int messageGroupHashBucketCount = 1024; private PendingMessageLimitStrategy pendingMessageLimitStrategy; private MessageEvictionStrategy messageEvictionStrategy; + private long memoryLimit; public void configure(Queue queue) { if (dispatchPolicy != null) { @@ -51,6 +52,9 @@ public class PolicyEntry extends DestinationMapEntry { queue.setDeadLetterStrategy(deadLetterStrategy); } queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount); + if( memoryLimit>0 ) { + queue.getUsageManager().setLimit(memoryLimit); + } } public void configure(Topic topic) { @@ -64,6 +68,9 @@ public class PolicyEntry extends DestinationMapEntry { topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy); } topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); + if( memoryLimit>0 ) { + topic.getUsageManager().setLimit(memoryLimit); + } } public void configure(TopicSubscription subscription) { @@ -173,4 +180,12 @@ public class PolicyEntry extends DestinationMapEntry { this.messageEvictionStrategy = messageEvictionStrategy; } + public long getMemoryLimit() { + return memoryLimit; + } + + public void setMemoryLimit(long memoryLimit) { + this.memoryLimit = memoryLimit; + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 4362fa2208..06530d8094 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -56,7 +56,7 @@ public class StompTest extends CombinationTestSupport { broker.start(); URI connectUri = connector.getConnectUri(); - stompSocket = new Socket(connectUri.getHost(), connectUri.getPort()); + stompSocket = new Socket("127.0.0.1", connectUri.getPort()); inputBuffer = new ByteArrayOutputStream(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");