This closes #473

This commit is contained in:
Clebert Suconic 2016-04-19 23:00:32 -04:00
commit 17fe4707f4
8 changed files with 427 additions and 0 deletions

View File

@ -35,6 +35,36 @@ public interface ActiveMQServerControl {
@Attribute(desc = "number of clients connected to this server")
int getConnectionCount();
/**
* Returns the number of clients which have connected to this server since it was started.
*/
@Attribute(desc = "number of clients which have connected to this server since it was started")
long getTotalConnectionCount();
/**
* Returns the number of messages in all queues on the server.
*/
@Attribute(desc = "number of messages in all queues on the server")
long getTotalMessageCount();
/**
* Returns the number of messages sent to this server since it was started.
*/
@Attribute(desc = "number of messages sent to this server since it was started")
long getTotalMessagesAdded();
/**
* Returns the number of messages sent to this server since it was started.
*/
@Attribute(desc = "number of messages acknowledged from all the queues on this server since it was started")
long getTotalMessagesAcknowledged();
/**
* Returns the number of messages sent to this server since it was started.
*/
@Attribute(desc = "number of consumers consuming messages from all the queues on this server")
long getTotalConsumerCount();
/**
* Return whether this server is started.
*/
@ -354,6 +384,18 @@ public interface ActiveMQServerControl {
@Attribute(desc = "names of the queues created on this server")
String[] getQueueNames();
/**
* Returns the uptime of this server.
*/
@Attribute(desc = "uptime of this server")
String getUptime();
/**
* Returns the uptime of this server.
*/
@Attribute(desc = "uptime of this server in milliseconds")
long getUptimeMillis();
// Operations ----------------------------------------------------
/**

View File

@ -643,6 +643,32 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public String getUptime() {
checkStarted();
clearIO();
try {
return server.getUptime();
}
finally {
blockOnIO();
}
}
@Override
public long getUptimeMillis() {
checkStarted();
clearIO();
try {
return server.getUptimeMillis();
}
finally {
blockOnIO();
}
}
@Override
public String[] getAddressNames() {
checkStarted();
@ -691,6 +717,71 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public long getTotalConnectionCount() {
checkStarted();
clearIO();
try {
return server.getTotalConnectionCount();
}
finally {
blockOnIO();
}
}
@Override
public long getTotalMessageCount() {
checkStarted();
clearIO();
try {
return server.getTotalMessageCount();
}
finally {
blockOnIO();
}
}
@Override
public long getTotalMessagesAdded() {
checkStarted();
clearIO();
try {
return server.getTotalMessagesAdded();
}
finally {
blockOnIO();
}
}
@Override
public long getTotalMessagesAcknowledged() {
checkStarted();
clearIO();
try {
return server.getTotalMessagesAcknowledged();
}
finally {
blockOnIO();
}
}
@Override
public long getTotalConsumerCount() {
checkStarted();
clearIO();
try {
return server.getTotalConsumerCount();
}
finally {
blockOnIO();
}
}
@Override
public void enableMessageCounters() {
checkStarted();

View File

@ -40,6 +40,8 @@ public interface RemotingService {
Set<RemotingConnection> getConnections();
long getTotalConnectionCount();
ReusableLatch getConnectionCountLatch();
void addIncomingInterceptor(BaseInterceptor interceptor);

View File

@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -115,6 +116,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
private boolean paused = false;
private AtomicLong totalConnectionCount = new AtomicLong(0);
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -444,6 +447,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
return conns;
}
@Override
public long getTotalConnectionCount() {
return totalConnectionCount.get();
}
@Override
public synchronized ReusableLatch getConnectionCountLatch() {
return connectionCountLatch;
@ -471,6 +479,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
connections.put(connection.getID(), entry);
connectionCountLatch.countUp();
totalConnectionCount.incrementAndGet();
}
@Override

View File

@ -153,6 +153,16 @@ public interface ActiveMQServer extends ActiveMQComponent {
int getConnectionCount();
long getTotalConnectionCount();
long getTotalMessageCount();
long getTotalMessagesAdded();
long getTotalMessagesAcknowledged();
long getTotalConsumerCount();
PostOffice getPostOffice();
QueueFactory getQueueFactory();
@ -172,6 +182,10 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean isActive();
String getUptime();
long getUptimeMillis();
/**
* This is the queue creator responsible for JMS Queue creations*
*

View File

@ -1243,6 +1243,63 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return remotingService.getConnections().size();
}
@Override
public long getTotalConnectionCount() {
return remotingService.getTotalConnectionCount();
}
@Override
public long getTotalMessageCount() {
long total = 0;
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding)binding).getQueue().getMessageCount();
}
}
return total;
}
@Override
public long getTotalMessagesAdded() {
long total = 0;
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding)binding).getQueue().getMessagesAdded();
}
}
return total;
}
@Override
public long getTotalMessagesAcknowledged() {
long total = 0;
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding)binding).getQueue().getMessagesAcknowledged();
}
}
return total;
}
@Override
public long getTotalConsumerCount() {
long total = 0;
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding)binding).getQueue().getConsumerCount();
}
}
return total;
}
@Override
public PostOffice getPostOffice() {
return postOffice;
@ -2195,6 +2252,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
@Override
public String getUptime() {
long delta = getUptimeMillis();
@ -2205,6 +2263,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return TimeUtils.printDuration(delta);
}
@Override
public long getUptimeMillis() {
if (startDate == null) {
return 0;

View File

@ -880,6 +880,181 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
assertFalse(server.isStarted());
}
@Test
public void testTotalMessageCount() throws Exception {
String random1 = RandomUtil.randomString();
String random2 = RandomUtil.randomString();
ActiveMQServerControl serverControl = createManagementControl();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession();
session.createQueue(random1, random1);
session.createQueue(random2, random2);
ClientProducer producer1 = session.createProducer(random1);
ClientProducer producer2 = session.createProducer(random2);
ClientMessage message = session.createMessage(false);
producer1.send(message);
producer2.send(message);
session.commit();
assertEquals(2, serverControl.getTotalMessageCount());
session.deleteQueue(random1);
session.deleteQueue(random2);
session.close();
locator.close();
}
@Test
public void testTotalConnectionCount() throws Exception {
final int CONNECTION_COUNT = 100;
ActiveMQServerControl serverControl = createManagementControl();
ServerLocator locator = createInVMNonHALocator();
for (int i = 0; i < CONNECTION_COUNT; i++) {
createSessionFactory(locator).close();
}
assertEquals(CONNECTION_COUNT, serverControl.getTotalConnectionCount());
assertEquals(0, serverControl.getConnectionCount());
locator.close();
}
@Test
public void testTotalMessagesAdded() throws Exception {
String random1 = RandomUtil.randomString();
String random2 = RandomUtil.randomString();
ActiveMQServerControl serverControl = createManagementControl();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession();
session.createQueue(random1, random1);
session.createQueue(random2, random2);
ClientProducer producer1 = session.createProducer(random1);
ClientProducer producer2 = session.createProducer(random2);
ClientMessage message = session.createMessage(false);
producer1.send(message);
producer2.send(message);
session.commit();
ClientConsumer consumer1 = session.createConsumer(random1);
ClientConsumer consumer2 = session.createConsumer(random2);
session.start();
assertNotNull(consumer1.receive().acknowledge());
assertNotNull(consumer2.receive().acknowledge());
session.commit();
assertEquals(2, serverControl.getTotalMessagesAdded());
assertEquals(0, serverControl.getTotalMessageCount());
consumer1.close();
consumer2.close();
session.deleteQueue(random1);
session.deleteQueue(random2);
session.close();
locator.close();
}
@Test
public void testTotalMessagesAcknowledged() throws Exception {
String random1 = RandomUtil.randomString();
String random2 = RandomUtil.randomString();
ActiveMQServerControl serverControl = createManagementControl();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession();
session.createQueue(random1, random1);
session.createQueue(random2, random2);
ClientProducer producer1 = session.createProducer(random1);
ClientProducer producer2 = session.createProducer(random2);
ClientMessage message = session.createMessage(false);
producer1.send(message);
producer2.send(message);
session.commit();
ClientConsumer consumer1 = session.createConsumer(random1);
ClientConsumer consumer2 = session.createConsumer(random2);
session.start();
assertNotNull(consumer1.receive().acknowledge());
assertNotNull(consumer2.receive().acknowledge());
session.commit();
assertEquals(2, serverControl.getTotalMessagesAcknowledged());
assertEquals(0, serverControl.getTotalMessageCount());
consumer1.close();
consumer2.close();
session.deleteQueue(random1);
session.deleteQueue(random2);
session.close();
locator.close();
}
@Test
public void testTotalConsumerCount() throws Exception {
String random1 = RandomUtil.randomString();
String random2 = RandomUtil.randomString();
ActiveMQServerControl serverControl = createManagementControl();
QueueControl queueControl1 = ManagementControlHelper.createQueueControl(SimpleString.toSimpleString(random1), SimpleString.toSimpleString(random1), mbeanServer);
QueueControl queueControl2 = ManagementControlHelper.createQueueControl(SimpleString.toSimpleString(random2), SimpleString.toSimpleString(random2), mbeanServer);
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession();
session.createQueue(random1, random1);
session.createQueue(random2, random2);
ClientConsumer consumer1 = session.createConsumer(random1);
ClientConsumer consumer2 = session.createConsumer(random2);
assertEquals(2, serverControl.getTotalConsumerCount());
assertEquals(1, queueControl1.getConsumerCount());
assertEquals(1, queueControl2.getConsumerCount());
consumer1.close();
consumer2.close();
session.deleteQueue(random1);
session.deleteQueue(random2);
session.close();
locator.close();
}
protected void scaleDown(ScaleDownHandler handler) throws Exception {
SimpleString address = new SimpleString("testQueue");
HashMap<String, Object> params = new HashMap<>();

View File

@ -178,6 +178,31 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
return (Integer) proxy.retrieveAttributeValue("connectionCount");
}
@Override
public long getTotalConnectionCount() {
return (Long) proxy.retrieveAttributeValue("totalConnectionCount", Long.class);
}
@Override
public long getTotalMessageCount() {
return (Long) proxy.retrieveAttributeValue("totalMessageCount", Long.class);
}
@Override
public long getTotalMessagesAdded() {
return (Long) proxy.retrieveAttributeValue("totalMessagesAdded", Long.class);
}
@Override
public long getTotalMessagesAcknowledged() {
return (Long) proxy.retrieveAttributeValue("totalMessagesAcknowledged", Long.class);
}
@Override
public long getTotalConsumerCount() {
return (Long) proxy.retrieveAttributeValue("totalConsumerCount", Long.class);
}
@Override
public long getConnectionTTLOverride() {
return (Long) proxy.retrieveAttributeValue("connectionTTLOverride", Long.class);
@ -203,6 +228,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
return ActiveMQServerControlUsingCoreTest.toStringArray((Object[]) proxy.retrieveAttributeValue("queueNames"));
}
@Override
public String getUptime() {
return null;
}
@Override
public long getUptimeMillis() {
return 0;
}
@Override
public int getIDCacheSize() {
return (Integer) proxy.retrieveAttributeValue("IDCacheSize");