From 9c62531c2fe000a3c07feba073332a6829463fb3 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 29 Jun 2018 09:19:15 -0500 Subject: [PATCH] ARTEMIS-1961 track routed and unrouted messages sent to an address --- .../api/core/management/AddressControl.java | 12 ++++++++ .../management/impl/AddressControlImpl.java | 10 +++++++ .../core/postoffice/impl/PostOfficeImpl.java | 29 +++++++++--------- .../artemis/core/server/impl/AddressInfo.java | 25 ++++++++++++++++ .../management/AddressControlTest.java | 30 +++++++++++++++++++ .../AddressControlUsingCoreTest.java | 10 +++++++ 6 files changed, 102 insertions(+), 14 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index 93a2822565..4bb100b5a9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -104,6 +104,18 @@ public interface AddressControl { @Attribute(desc = "number of messages added to all the queues for this address") long getMessageCount(); + /** + * Returns the number of messages routed to one or more bindings + */ + @Attribute(desc = "number of messages routed to one or more bindings") + long getRoutedMessageCount(); + + /** + * Returns the number of messages not routed to any bindings + */ + @Attribute(desc = "number of messages not routed to any bindings") + long getUnRoutedMessageCount(); + /** * @param headers the message headers and properties to set. Can only diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 82c6a6c994..0eb39e0ea3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -267,6 +267,16 @@ public class AddressControlImpl extends AbstractControl implements AddressContro return getMessageCount(DurabilityType.ALL); } + @Override + public long getRoutedMessageCount() { + return addressInfo.getRoutedMessageCount(); + } + + @Override + public long getUnRoutedMessageCount() { + return addressInfo.getUnRoutedMessageCount(); + } + @Override public String sendMessage(final Map headers, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 02abf466c6..ec451f7198 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -842,12 +842,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding throw new IllegalStateException("Message cannot be routed more than once"); } - setPagingStore(context.getAddress(message), message); + final SimpleString address = context.getAddress(message); + + setPagingStore(address, message); AtomicBoolean startedTX = new AtomicBoolean(false); - final SimpleString address = context.getAddress(message); - applyExpiryDelay(message, address); if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) { @@ -856,23 +856,24 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding message.cleanupInternalProperties(); - Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message)); + Bindings bindings = addressManager.getBindingsForRoutingAddress(address); + + AddressInfo addressInfo = addressManager.getAddressInfo(address); - // TODO auto-create queues here? - // first check for the auto-queue creation thing - if (bindings == null) { - // There is no queue with this address, we will check if it needs to be created - // if (queueCreator.create(address)) { - // TODO: this is not working!!!! - // reassign bindings if it was created - // bindings = addressManager.getBindingsForRoutingAddress(address); - // } - } if (bindingMove != null) { bindingMove.route(message, context); + if (addressInfo != null) { + addressInfo.incrementRoutedMessageCount(); + } } else if (bindings != null) { bindings.route(message, context); + if (addressInfo != null) { + addressInfo.incrementRoutedMessageCount(); + } } else { + if (addressInfo != null) { + addressInfo.incrementUnRoutedMessageCount(); + } // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS) if (logger.isDebugEnabled()) { logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 4bc540fa3d..0cf94523f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.utils.PrefixUtil; import java.util.EnumSet; import java.util.Map; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; public class AddressInfo { @@ -36,6 +37,14 @@ public class AddressInfo { private boolean internal = false; + private volatile long routedMessageCount = 0; + + private static final AtomicLongFieldUpdater routedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "routedMessageCount"); + + private volatile long unRoutedMessageCount = 0; + + private static final AtomicLongFieldUpdater unRoutedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount"); + public AddressInfo(SimpleString name) { this(name, EnumSet.noneOf(RoutingType.class)); } @@ -155,4 +164,20 @@ public class AddressInfo { return this; } + public long incrementRoutedMessageCount() { + return routedMessageCountUpdater.incrementAndGet(this); + } + + public long incrementUnRoutedMessageCount() { + return unRoutedMessageCountUpdater.incrementAndGet(this); + } + + public long getRoutedMessageCount() { + return routedMessageCountUpdater.get(this); + } + + public long getUnRoutedMessageCount() { + return unRoutedMessageCountUpdater.get(this); + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 9f66bcfb32..69794d8bdb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -340,6 +340,36 @@ public class AddressControlTest extends ManagementTestBase { assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100)); } + @Test + public void testGetRoutedMessageCounts() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + session.createAddress(address, RoutingType.ANYCAST, false); + + AddressControl addressControl = createManagementControl(address); + assertEquals(0, addressControl.getMessageCount()); + + ClientProducer producer = session.createProducer(address.toString()); + producer.send(session.createMessage(false)); + assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 0, 2000, 100)); + assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100)); + + session.createQueue(address, RoutingType.ANYCAST, address); + producer.send(session.createMessage(false)); + assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 1, 2000, 100)); + assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100)); + + session.createQueue(address, RoutingType.ANYCAST, address.concat('2')); + producer.send(session.createMessage(false)); + assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100)); + assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100)); + + session.deleteQueue(address); + session.deleteQueue(address.concat('2')); + producer.send(session.createMessage(false)); + assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100)); + assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 2, 2000, 100)); + } + @Test public void testSendMessage() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java index d9c9f2e450..5e5dc5438c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java @@ -103,6 +103,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest { return (long) proxy.retrieveAttributeValue("messageCount"); } + @Override + public long getRoutedMessageCount() { + return (long) proxy.retrieveAttributeValue("routedMessageCount"); + } + + @Override + public long getUnRoutedMessageCount() { + return (long) proxy.retrieveAttributeValue("unRoutedMessageCount"); + } + @Override public String sendMessage(Map headers, int type,