diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 3055b574ed..d795c4d5e9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -157,6 +157,21 @@ public class DestinationView implements DestinationViewMBean { return destination.getDestinationStatistics().getProcessTime().getAverageTime(); } + @Override + public int getTempUsagePercentUsage() { + return destination.getTempUsage().getPercentUsage(); + } + + @Override + public long getTempUsageLimit() { + return destination.getTempUsage().getLimit(); + } + + @Override + public void setTempUsageLimit(long limit) { + destination.getTempUsage().setLimit(limit); + } + @Override public long getMaxEnqueueTime() { return destination.getDestinationStatistics().getProcessTime().getMaxTime(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 7e09948515..ad0ae32e4c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -238,6 +238,24 @@ public interface DestinationViewMBean { */ void setMemoryLimit(long limit); + /** + * @return the percentage of amount of temp usage used + */ + @MBeanInfo("The percentage of the temp usage limit used") + int getTempUsagePercentUsage(); + + /** + * @return the amount of temp usage allocated to this destination + */ + @MBeanInfo("Temp usage limit, in bytes, assigned to this destination.") + long getTempUsageLimit(); + + /** + * set the amount of temp usage allocated to this destination + * @param limit the amount of temp usage allocated to this destination + */ + void setTempUsageLimit(long limit); + /** * @return the portion of memory from the broker memory limit for this destination */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 95c7e159a3..805ef6f61a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -42,6 +42,7 @@ import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.TempUsage; import org.apache.activemq.usage.Usage; import org.slf4j.Logger; @@ -278,6 +279,11 @@ public abstract class BaseDestination implements Destination { this.memoryUsage = memoryUsage; } + @Override + public TempUsage getTempUsage() { + return systemUsage.getTempUsage(); + } + @Override public DestinationStatistics getDestinationStatistics() { return destinationStatistics; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 81e7fa1ac2..031015e1f4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -33,6 +33,7 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.Task; import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.TempUsage; import org.apache.activemq.usage.Usage; /** @@ -70,6 +71,8 @@ public interface Destination extends Service, Task, Message.MessageDestination { void setMemoryUsage(MemoryUsage memoryUsage); + TempUsage getTempUsage(); + void dispose(ConnectionContext context) throws IOException; boolean isDisposed(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 1897c23056..0d0db050ac 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -32,6 +32,7 @@ import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.store.MessageStore; import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.TempUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.util.SubscriptionKey; @@ -122,6 +123,11 @@ public class DestinationFilter implements Destination { next.setMemoryUsage(memoryUsage); } + @Override + public TempUsage getTempUsage() { + return next.getTempUsage(); + } + @Override public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { next.removeSubscription(context, sub, lastDeliveredSequenceId); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index a8463d3203..e8ef717011 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1030,6 +1030,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (systemUsage.getStoreUsage() != null) { systemUsage.getStoreUsage().start(); } + if (systemUsage.getTempUsage() != null) { + systemUsage.getTempUsage().start(); + } systemUsage.getMemoryUsage().addUsageListener(this); messages.start(); if (getExpireMessagesPeriod() > 0) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java index f009d62967..f901cc2f69 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java @@ -120,6 +120,23 @@ public class BrokerDestinationView { return destination.getMemoryUsage().getLimit(); } + /** + * Gets the temp usage as a percentage for this Destination. + * + * @return Gets the temp usage as a percentage for this Destination. + */ + public int getTempPercentUsage() { + return destination.getTempUsage().getPercentUsage(); + } + + /** + * Gets the temp usage limit in bytes. + * + * @return the temp usage limit in bytes. + */ + public long getTempUsageLimit() { + return destination.getTempUsage().getLimit(); + } /** * @return the average time it takes to store a message on this destination (ms) diff --git a/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java b/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java new file mode 100644 index 0000000000..5848ef51c5 --- /dev/null +++ b/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java @@ -0,0 +1,100 @@ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.*; +import javax.management.ObjectName; + +import static org.junit.Assert.assertEquals; + +/** + * Tests to ensure when the temp usage limit is updated on the broker the queues also have their + * temp usage limits automatically updated. + */ +public class AMQ7085Test +{ + private BrokerService brokerService; + private String testQueueName = "testAMQ7085Queue"; + private ActiveMQQueue queue = new ActiveMQQueue(testQueueName); + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + String connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); + final Connection conn = connectionFactory.createConnection(); + try { + conn.start(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Destination queue = session.createQueue(testQueueName); + final Message toSend = session.createMessage(); + toSend.setStringProperty("foo", "bar"); + final MessageProducer producer = session.createProducer(queue); + producer.send(queue, toSend); + } finally { + conn.close(); + } + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testQueueTempUsageWhenSetExplicitly() throws Exception { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName()); + QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance( + queueViewMBeanName, QueueViewMBean.class, true); + + // Check that by default the queue's temp limit is the same as the broker's. + BrokerView brokerView = brokerService.getAdminView(); + long brokerTempLimit = brokerView.getTempLimit(); + assertEquals(brokerTempLimit, queueViewMBean.getTempUsageLimit()); + + // Change the queue's temp limit independently of the broker's setting and check the broker's limit does not + // change. + long queueTempLimit = brokerTempLimit + 111; + queueViewMBean.setTempUsageLimit(queueTempLimit); + assertEquals(queueViewMBean.getTempUsageLimit(), queueTempLimit); + assertEquals(brokerView.getTempLimit(), brokerTempLimit); + + // Now increase the broker's temp limit. Since the queue's limit was explicitly changed it should remain + // unchanged. + long newBrokerTempLimit = brokerTempLimit + 555; + brokerView.setTempLimit(newBrokerTempLimit); + assertEquals(brokerView.getTempLimit(), newBrokerTempLimit); + assertEquals(queueViewMBean.getTempUsageLimit(), queueTempLimit); + } + + @Test + public void testQueueTempUsageWhenBrokerTempUsageUpdated() throws Exception { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName()); + QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance( + queueViewMBeanName, QueueViewMBean.class, true); + + // Check that by default the queue's temp limit is the same as the broker's. + BrokerView brokerView = brokerService.getAdminView(); + long brokerTempLimit = brokerView.getTempLimit(); + assertEquals(brokerTempLimit, queueViewMBean.getTempUsageLimit()); + + // Increase the broker's temp limit and check the queue's limit is updated to the same value. + long newBrokerTempLimit = brokerTempLimit + 555; + brokerView.setTempLimit(newBrokerTempLimit); + assertEquals(brokerView.getTempLimit(), newBrokerTempLimit); + assertEquals(queueViewMBean.getTempUsageLimit(), newBrokerTempLimit); + } +}