AMQ-7085 - Properly start TempUsage inside Queue start

This fix allows temp usage changes to the broker temp usage to propagate
properly to a Queue's temp usage settings

Patch applied with thanks to David Sitsky
This commit is contained in:
Christopher L. Shannon (cshannon) 2019-01-17 07:58:36 -05:00
parent 9fba7ec352
commit daea192eef
8 changed files with 168 additions and 0 deletions

View File

@ -157,6 +157,21 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getProcessTime().getAverageTime();
}
@Override
public int getTempUsagePercentUsage() {
return destination.getTempUsage().getPercentUsage();
}
@Override
public long getTempUsageLimit() {
return destination.getTempUsage().getLimit();
}
@Override
public void setTempUsageLimit(long limit) {
destination.getTempUsage().setLimit(limit);
}
@Override
public long getMaxEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getMaxTime();

View File

@ -238,6 +238,24 @@ public interface DestinationViewMBean {
*/
void setMemoryLimit(long limit);
/**
* @return the percentage of amount of temp usage used
*/
@MBeanInfo("The percentage of the temp usage limit used")
int getTempUsagePercentUsage();
/**
* @return the amount of temp usage allocated to this destination
*/
@MBeanInfo("Temp usage limit, in bytes, assigned to this destination.")
long getTempUsageLimit();
/**
* set the amount of temp usage allocated to this destination
* @param limit the amount of temp usage allocated to this destination
*/
void setTempUsageLimit(long limit);
/**
* @return the portion of memory from the broker memory limit for this destination
*/

View File

@ -42,6 +42,7 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.usage.Usage;
import org.slf4j.Logger;
@ -278,6 +279,11 @@ public abstract class BaseDestination implements Destination {
this.memoryUsage = memoryUsage;
}
@Override
public TempUsage getTempUsage() {
return systemUsage.getTempUsage();
}
@Override
public DestinationStatistics getDestinationStatistics() {
return destinationStatistics;

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.usage.Usage;
/**
@ -70,6 +71,8 @@ public interface Destination extends Service, Task, Message.MessageDestination {
void setMemoryUsage(MemoryUsage memoryUsage);
TempUsage getTempUsage();
void dispose(ConnectionContext context) throws IOException;
boolean isDisposed();

View File

@ -32,6 +32,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.SubscriptionKey;
@ -122,6 +123,11 @@ public class DestinationFilter implements Destination {
next.setMemoryUsage(memoryUsage);
}
@Override
public TempUsage getTempUsage() {
return next.getTempUsage();
}
@Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
next.removeSubscription(context, sub, lastDeliveredSequenceId);

View File

@ -1030,6 +1030,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (systemUsage.getStoreUsage() != null) {
systemUsage.getStoreUsage().start();
}
if (systemUsage.getTempUsage() != null) {
systemUsage.getTempUsage().start();
}
systemUsage.getMemoryUsage().addUsageListener(this);
messages.start();
if (getExpireMessagesPeriod() > 0) {

View File

@ -120,6 +120,23 @@ public class BrokerDestinationView {
return destination.getMemoryUsage().getLimit();
}
/**
* Gets the temp usage as a percentage for this Destination.
*
* @return Gets the temp usage as a percentage for this Destination.
*/
public int getTempPercentUsage() {
return destination.getTempUsage().getPercentUsage();
}
/**
* Gets the temp usage limit in bytes.
*
* @return the temp usage limit in bytes.
*/
public long getTempUsageLimit() {
return destination.getTempUsage().getLimit();
}
/**
* @return the average time it takes to store a message on this destination (ms)

View File

@ -0,0 +1,100 @@
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.*;
import javax.management.ObjectName;
import static org.junit.Assert.assertEquals;
/**
* Tests to ensure when the temp usage limit is updated on the broker the queues also have their
* temp usage limits automatically updated.
*/
public class AMQ7085Test
{
private BrokerService brokerService;
private String testQueueName = "testAMQ7085Queue";
private ActiveMQQueue queue = new ActiveMQQueue(testQueueName);
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
String connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
brokerService.start();
brokerService.waitUntilStarted();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
final Connection conn = connectionFactory.createConnection();
try {
conn.start();
final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination queue = session.createQueue(testQueueName);
final Message toSend = session.createMessage();
toSend.setStringProperty("foo", "bar");
final MessageProducer producer = session.createProducer(queue);
producer.send(queue, toSend);
} finally {
conn.close();
}
}
@After
public void tearDown() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}
@Test
public void testQueueTempUsageWhenSetExplicitly() throws Exception {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(
queueViewMBeanName, QueueViewMBean.class, true);
// Check that by default the queue's temp limit is the same as the broker's.
BrokerView brokerView = brokerService.getAdminView();
long brokerTempLimit = brokerView.getTempLimit();
assertEquals(brokerTempLimit, queueViewMBean.getTempUsageLimit());
// Change the queue's temp limit independently of the broker's setting and check the broker's limit does not
// change.
long queueTempLimit = brokerTempLimit + 111;
queueViewMBean.setTempUsageLimit(queueTempLimit);
assertEquals(queueViewMBean.getTempUsageLimit(), queueTempLimit);
assertEquals(brokerView.getTempLimit(), brokerTempLimit);
// Now increase the broker's temp limit. Since the queue's limit was explicitly changed it should remain
// unchanged.
long newBrokerTempLimit = brokerTempLimit + 555;
brokerView.setTempLimit(newBrokerTempLimit);
assertEquals(brokerView.getTempLimit(), newBrokerTempLimit);
assertEquals(queueViewMBean.getTempUsageLimit(), queueTempLimit);
}
@Test
public void testQueueTempUsageWhenBrokerTempUsageUpdated() throws Exception {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(
queueViewMBeanName, QueueViewMBean.class, true);
// Check that by default the queue's temp limit is the same as the broker's.
BrokerView brokerView = brokerService.getAdminView();
long brokerTempLimit = brokerView.getTempLimit();
assertEquals(brokerTempLimit, queueViewMBean.getTempUsageLimit());
// Increase the broker's temp limit and check the queue's limit is updated to the same value.
long newBrokerTempLimit = brokerTempLimit + 555;
brokerView.setTempLimit(newBrokerTempLimit);
assertEquals(brokerView.getTempLimit(), newBrokerTempLimit);
assertEquals(queueViewMBean.getTempUsageLimit(), newBrokerTempLimit);
}
}