ARTEMIS-1433 adding AddressMemoryUsage() and AddressMemoryUsagePercentage() to ActiveMQServerControl

This commit is contained in:
Pat Fox 2017-09-22 10:54:07 +02:00 committed by Clebert Suconic
parent d170639bfe
commit 50fcd48e28
4 changed files with 150 additions and 0 deletions

View File

@ -438,6 +438,19 @@ public interface ActiveMQServerControl {
@Attribute(desc = "global maximum limit for in-memory messages, in bytes")
long getGlobalMaxSize();
/**
* Returns the memory used by all the addresses on broker for in-memory messages
*/
@Attribute(desc = "memory used by all the addresses on broker for in-memory messages")
long getAddressMemoryUsage();
/**
* Returns the memory used by all the addresses on broker as a percentage of global maximum limit
*/
@Attribute(desc = "memory used by all the addresses on broker as a percentage of global maximum limit")
int getAddressMemoryUsagePercentage();
// Operations ----------------------------------------------------
@Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION)
boolean freezeReplication();

View File

@ -577,6 +577,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public long getAddressMemoryUsage() {
checkStarted();
clearIO();
try {
//this should not happen but if it does, return -1 to highlight it is not working
if (server.getPagingManager() == null) {
return -1L;
}
return server.getPagingManager().getGlobalSize();
} finally {
blockOnIO();
}
}
@Override
public int getAddressMemoryUsagePercentage() {
long globalMaxSize = getGlobalMaxSize();
// no max size set implies 0% used
if (globalMaxSize <= 0) {
return 0;
}
long memoryUsed = getAddressMemoryUsage();
if (memoryUsed <= 0) {
return 0;
}
double result = (100D * memoryUsed) / globalMaxSize;
return (int) result;
}
@Override
public boolean freezeReplication() {
Activation activation = server.getActivation();

View File

@ -20,12 +20,14 @@ import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -1125,6 +1127,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
locator.close();
}
@Test
public void testTotalMessagesAcknowledged() throws Exception {
String random1 = RandomUtil.randomString();
@ -1877,6 +1880,65 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
}
@Test
public void testMemoryUsagePercentage() throws Exception {
//messages size 100K
final int MESSAGE_SIZE = 100000;
String name1 = "messageUsagePercentage.test.1";
server.stop();
//no globalMaxSize set
server.getConfiguration().setGlobalMaxSize(-1);
server.start();
ActiveMQServerControl serverControl = createManagementControl();
// check before adding messages
assertEquals("Memory Usage before adding messages", 0, serverControl.getAddressMemoryUsage());
assertEquals("MemoryUsagePercentage", 0, serverControl.getAddressMemoryUsagePercentage());
try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator); ClientSession session = csf.createSession()) {
session.createQueue(name1, RoutingType.ANYCAST, name1);
ClientProducer producer1 = session.createProducer(name1);
sendMessagesWithPredefinedSize(30, session, producer1, MESSAGE_SIZE);
//it is hard to predict an exact number so checking if it falls in a certain range: totalSizeOfMessageSent < X > totalSizeofMessageSent + 100k
assertTrue("Memory Usage within range ", ((30 * MESSAGE_SIZE) < serverControl.getAddressMemoryUsage()) && (serverControl.getAddressMemoryUsage() < ((30 * MESSAGE_SIZE) + 100000)));
// no globalMaxSize set so it should return zero
assertEquals("MemoryUsagePercentage", 0, serverControl.getAddressMemoryUsagePercentage());
}
}
@Test
public void testMemoryUsage() throws Exception {
//messages size 100K
final int MESSAGE_SIZE = 100000;
String name1 = "messageUsage.test.1";
String name2 = "messageUsage.test.2";
server.stop();
// set to 5 MB
server.getConfiguration().setGlobalMaxSize(5000000);
server.start();
ActiveMQServerControl serverControl = createManagementControl();
// check before adding messages
assertEquals("Memory Usage before adding messages", 0, serverControl.getAddressMemoryUsage());
assertEquals("MemoryUsagePercentage", 0, serverControl.getAddressMemoryUsagePercentage());
try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator); ClientSession session = csf.createSession()) {
session.createQueue(name1, RoutingType.ANYCAST, name1);
session.createQueue(name2, RoutingType.ANYCAST, name2);
ClientProducer producer1 = session.createProducer(name1);
ClientProducer producer2 = session.createProducer(name2);
sendMessagesWithPredefinedSize(10, session, producer1, MESSAGE_SIZE);
sendMessagesWithPredefinedSize(10, session, producer2, MESSAGE_SIZE);
//it is hard to predict an exact number so checking if it falls in a certain range: totalSizeOfMessageSent < X > totalSizeofMessageSent + 100k
assertTrue("Memory Usage within range ", ((20 * MESSAGE_SIZE) < serverControl.getAddressMemoryUsage()) && (serverControl.getAddressMemoryUsage() < ((20 * MESSAGE_SIZE) + 100000)));
assertTrue("MemoryUsagePercentage", (40 <= serverControl.getAddressMemoryUsagePercentage()) && (42 >= serverControl.getAddressMemoryUsagePercentage()));
}
}
@Test
public void testConnectorServiceManagement() throws Exception {
ActiveMQServerControl managementControl = createManagementControl();
@ -1976,6 +2038,29 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
return jsonFilterObject.toString();
}
private void sendMessagesWithPredefinedSize(int numberOfMessages,
ClientSession session,
ClientProducer producer,
int messageSize) throws Exception {
ClientMessage message;
final byte[] body = new byte[messageSize];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int i = 1; i <= messageSize; i++) {
bb.put(getSamplebyte(i));
}
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
}
// Inner classes -------------------------------------------------
}

View File

@ -619,6 +619,26 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
return (Long) proxy.retrieveAttributeValue("GlobalMaxSize", Long.class);
}
@Override
public long getAddressMemoryUsage() {
try {
return (Long) proxy.invokeOperation("getAddressMemoryUsage");
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
@Override
public int getAddressMemoryUsagePercentage() {
try {
return (Integer) proxy.invokeOperation(Integer.TYPE, "getAddressMemoryUsagePercentage");
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
@Override
public boolean freezeReplication() {