diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 53673b2e49..39b18f2074 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -35,6 +35,36 @@ public interface ActiveMQServerControl { @Attribute(desc = "number of clients connected to this server") int getConnectionCount(); + /** + * Returns the number of clients which have connected to this server since it was started. + */ + @Attribute(desc = "number of clients which have connected to this server since it was started") + long getTotalConnectionCount(); + + /** + * Returns the number of messages in all queues on the server. + */ + @Attribute(desc = "number of messages in all queues on the server") + long getTotalMessageCount(); + + /** + * Returns the number of messages sent to this server since it was started. + */ + @Attribute(desc = "number of messages sent to this server since it was started") + long getTotalMessagesAdded(); + + /** + * Returns the number of messages sent to this server since it was started. + */ + @Attribute(desc = "number of messages acknowledged from all the queues on this server since it was started") + long getTotalMessagesAcknowledged(); + + /** + * Returns the number of messages sent to this server since it was started. + */ + @Attribute(desc = "number of consumers consuming messages from all the queues on this server") + long getTotalConsumerCount(); + /** * Return whether this server is started. */ @@ -354,6 +384,18 @@ public interface ActiveMQServerControl { @Attribute(desc = "names of the queues created on this server") String[] getQueueNames(); + /** + * Returns the uptime of this server. + */ + @Attribute(desc = "uptime of this server") + String getUptime(); + + /** + * Returns the uptime of this server. + */ + @Attribute(desc = "uptime of this server in milliseconds") + long getUptimeMillis(); + // Operations ---------------------------------------------------- /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index ae16891a3a..6e7f4dc156 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -643,6 +643,32 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + @Override + public String getUptime() { + checkStarted(); + + clearIO(); + try { + return server.getUptime(); + } + finally { + blockOnIO(); + } + } + + @Override + public long getUptimeMillis() { + checkStarted(); + + clearIO(); + try { + return server.getUptimeMillis(); + } + finally { + blockOnIO(); + } + } + @Override public String[] getAddressNames() { checkStarted(); @@ -691,6 +717,71 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + @Override + public long getTotalConnectionCount() { + checkStarted(); + + clearIO(); + try { + return server.getTotalConnectionCount(); + } + finally { + blockOnIO(); + } + } + + @Override + public long getTotalMessageCount() { + checkStarted(); + + clearIO(); + try { + return server.getTotalMessageCount(); + } + finally { + blockOnIO(); + } + } + + @Override + public long getTotalMessagesAdded() { + checkStarted(); + + clearIO(); + try { + return server.getTotalMessagesAdded(); + } + finally { + blockOnIO(); + } + } + + @Override + public long getTotalMessagesAcknowledged() { + checkStarted(); + + clearIO(); + try { + return server.getTotalMessagesAcknowledged(); + } + finally { + blockOnIO(); + } + } + + @Override + public long getTotalConsumerCount() { + checkStarted(); + + clearIO(); + try { + return server.getTotalConsumerCount(); + } + finally { + blockOnIO(); + } + } + @Override public void enableMessageCounters() { checkStarted(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java index 631a71a6c0..061e5a67c5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java @@ -40,6 +40,8 @@ public interface RemotingService { Set getConnections(); + long getTotalConnectionCount(); + ReusableLatch getConnectionCountLatch(); void addIncomingInterceptor(BaseInterceptor interceptor); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 2ed6e3a501..93fd4aec43 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -115,6 +116,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif private boolean paused = false; + private AtomicLong totalConnectionCount = new AtomicLong(0); + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -444,6 +447,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif return conns; } + @Override + public long getTotalConnectionCount() { + return totalConnectionCount.get(); + } + @Override public synchronized ReusableLatch getConnectionCountLatch() { return connectionCountLatch; @@ -471,6 +479,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif connections.put(connection.getID(), entry); connectionCountLatch.countUp(); + totalConnectionCount.incrementAndGet(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index b47df20567..2b0e322564 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -153,6 +153,16 @@ public interface ActiveMQServer extends ActiveMQComponent { int getConnectionCount(); + long getTotalConnectionCount(); + + long getTotalMessageCount(); + + long getTotalMessagesAdded(); + + long getTotalMessagesAcknowledged(); + + long getTotalConsumerCount(); + PostOffice getPostOffice(); QueueFactory getQueueFactory(); @@ -172,6 +182,10 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean isActive(); + String getUptime(); + + long getUptimeMillis(); + /** * This is the queue creator responsible for JMS Queue creations* * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 96f661aef7..8914886060 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1243,6 +1243,63 @@ public class ActiveMQServerImpl implements ActiveMQServer { return remotingService.getConnections().size(); } + @Override + public long getTotalConnectionCount() { + return remotingService.getTotalConnectionCount(); + } + + @Override + public long getTotalMessageCount() { + long total = 0; + + for (Binding binding : postOffice.getAllBindings().values()) { + if (binding.getType() == BindingType.LOCAL_QUEUE) { + total += ((LocalQueueBinding)binding).getQueue().getMessageCount(); + } + } + + return total; + } + + @Override + public long getTotalMessagesAdded() { + long total = 0; + + for (Binding binding : postOffice.getAllBindings().values()) { + if (binding.getType() == BindingType.LOCAL_QUEUE) { + total += ((LocalQueueBinding)binding).getQueue().getMessagesAdded(); + } + } + + return total; + } + + @Override + public long getTotalMessagesAcknowledged() { + long total = 0; + + for (Binding binding : postOffice.getAllBindings().values()) { + if (binding.getType() == BindingType.LOCAL_QUEUE) { + total += ((LocalQueueBinding)binding).getQueue().getMessagesAcknowledged(); + } + } + + return total; + } + + @Override + public long getTotalConsumerCount() { + long total = 0; + + for (Binding binding : postOffice.getAllBindings().values()) { + if (binding.getType() == BindingType.LOCAL_QUEUE) { + total += ((LocalQueueBinding)binding).getQueue().getConsumerCount(); + } + } + + return total; + } + @Override public PostOffice getPostOffice() { return postOffice; @@ -2195,6 +2252,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + @Override public String getUptime() { long delta = getUptimeMillis(); @@ -2205,6 +2263,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { return TimeUtils.printDuration(delta); } + @Override public long getUptimeMillis() { if (startDate == null) { return 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 9c4bd2e766..b66636a754 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -880,6 +880,181 @@ public class ActiveMQServerControlTest extends ManagementTestBase { assertFalse(server.isStarted()); } + @Test + public void testTotalMessageCount() throws Exception { + String random1 = RandomUtil.randomString(); + String random2 = RandomUtil.randomString(); + + ActiveMQServerControl serverControl = createManagementControl(); + + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory csf = createSessionFactory(locator); + ClientSession session = csf.createSession(); + + session.createQueue(random1, random1); + session.createQueue(random2, random2); + + ClientProducer producer1 = session.createProducer(random1); + ClientProducer producer2 = session.createProducer(random2); + ClientMessage message = session.createMessage(false); + producer1.send(message); + producer2.send(message); + + session.commit(); + + assertEquals(2, serverControl.getTotalMessageCount()); + + session.deleteQueue(random1); + session.deleteQueue(random2); + + session.close(); + + locator.close(); + } + + @Test + public void testTotalConnectionCount() throws Exception { + final int CONNECTION_COUNT = 100; + + ActiveMQServerControl serverControl = createManagementControl(); + + ServerLocator locator = createInVMNonHALocator(); + for (int i = 0; i < CONNECTION_COUNT; i++) { + createSessionFactory(locator).close(); + } + + assertEquals(CONNECTION_COUNT, serverControl.getTotalConnectionCount()); + assertEquals(0, serverControl.getConnectionCount()); + + locator.close(); + } + + @Test + public void testTotalMessagesAdded() throws Exception { + String random1 = RandomUtil.randomString(); + String random2 = RandomUtil.randomString(); + + ActiveMQServerControl serverControl = createManagementControl(); + + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory csf = createSessionFactory(locator); + ClientSession session = csf.createSession(); + + session.createQueue(random1, random1); + session.createQueue(random2, random2); + + ClientProducer producer1 = session.createProducer(random1); + ClientProducer producer2 = session.createProducer(random2); + ClientMessage message = session.createMessage(false); + producer1.send(message); + producer2.send(message); + + session.commit(); + + ClientConsumer consumer1 = session.createConsumer(random1); + ClientConsumer consumer2 = session.createConsumer(random2); + + session.start(); + + assertNotNull(consumer1.receive().acknowledge()); + assertNotNull(consumer2.receive().acknowledge()); + + session.commit(); + + assertEquals(2, serverControl.getTotalMessagesAdded()); + assertEquals(0, serverControl.getTotalMessageCount()); + + consumer1.close(); + consumer2.close(); + + session.deleteQueue(random1); + session.deleteQueue(random2); + + session.close(); + + locator.close(); + } + + @Test + public void testTotalMessagesAcknowledged() throws Exception { + String random1 = RandomUtil.randomString(); + String random2 = RandomUtil.randomString(); + + ActiveMQServerControl serverControl = createManagementControl(); + + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory csf = createSessionFactory(locator); + ClientSession session = csf.createSession(); + + session.createQueue(random1, random1); + session.createQueue(random2, random2); + + ClientProducer producer1 = session.createProducer(random1); + ClientProducer producer2 = session.createProducer(random2); + ClientMessage message = session.createMessage(false); + producer1.send(message); + producer2.send(message); + + session.commit(); + + ClientConsumer consumer1 = session.createConsumer(random1); + ClientConsumer consumer2 = session.createConsumer(random2); + + session.start(); + + assertNotNull(consumer1.receive().acknowledge()); + assertNotNull(consumer2.receive().acknowledge()); + + session.commit(); + + assertEquals(2, serverControl.getTotalMessagesAcknowledged()); + assertEquals(0, serverControl.getTotalMessageCount()); + + consumer1.close(); + consumer2.close(); + + session.deleteQueue(random1); + session.deleteQueue(random2); + + session.close(); + + locator.close(); + } + + @Test + public void testTotalConsumerCount() throws Exception { + String random1 = RandomUtil.randomString(); + String random2 = RandomUtil.randomString(); + + ActiveMQServerControl serverControl = createManagementControl(); + QueueControl queueControl1 = ManagementControlHelper.createQueueControl(SimpleString.toSimpleString(random1), SimpleString.toSimpleString(random1), mbeanServer); + QueueControl queueControl2 = ManagementControlHelper.createQueueControl(SimpleString.toSimpleString(random2), SimpleString.toSimpleString(random2), mbeanServer); + + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory csf = createSessionFactory(locator); + ClientSession session = csf.createSession(); + + session.createQueue(random1, random1); + session.createQueue(random2, random2); + + ClientConsumer consumer1 = session.createConsumer(random1); + ClientConsumer consumer2 = session.createConsumer(random2); + + assertEquals(2, serverControl.getTotalConsumerCount()); + assertEquals(1, queueControl1.getConsumerCount()); + assertEquals(1, queueControl2.getConsumerCount()); + + consumer1.close(); + consumer2.close(); + + session.deleteQueue(random1); + session.deleteQueue(random2); + + session.close(); + + locator.close(); + } + protected void scaleDown(ScaleDownHandler handler) throws Exception { SimpleString address = new SimpleString("testQueue"); HashMap params = new HashMap<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 9ed5433457..7fb5a5b8d7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -178,6 +178,31 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes return (Integer) proxy.retrieveAttributeValue("connectionCount"); } + @Override + public long getTotalConnectionCount() { + return (Long) proxy.retrieveAttributeValue("totalConnectionCount", Long.class); + } + + @Override + public long getTotalMessageCount() { + return (Long) proxy.retrieveAttributeValue("totalMessageCount", Long.class); + } + + @Override + public long getTotalMessagesAdded() { + return (Long) proxy.retrieveAttributeValue("totalMessagesAdded", Long.class); + } + + @Override + public long getTotalMessagesAcknowledged() { + return (Long) proxy.retrieveAttributeValue("totalMessagesAcknowledged", Long.class); + } + + @Override + public long getTotalConsumerCount() { + return (Long) proxy.retrieveAttributeValue("totalConsumerCount", Long.class); + } + @Override public long getConnectionTTLOverride() { return (Long) proxy.retrieveAttributeValue("connectionTTLOverride", Long.class); @@ -203,6 +228,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes return ActiveMQServerControlUsingCoreTest.toStringArray((Object[]) proxy.retrieveAttributeValue("queueNames")); } + @Override + public String getUptime() { + return null; + } + + @Override + public long getUptimeMillis() { + return 0; + } + @Override public int getIDCacheSize() { return (Integer) proxy.retrieveAttributeValue("IDCacheSize");