From 677d71b8e7b3bb0afd2fd618af4b86657324e50b Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 14 Jul 2023 16:49:31 -0400 Subject: [PATCH] ARTEMIS-4366 Missing Mirrored ACKs with MULTICAST and subscriptions --- .../artemis/cli/commands/AbstractAction.java | 17 +- .../utils/collections/NodeStoreFactory.java | 21 ++ .../api/core/management/ManagementHelper.java | 37 +++ .../amqp/broker/ProtonProtocolManager.java | 8 +- .../mirror/AMQPMirrorControllerSource.java | 6 +- .../mirror/AMQPMirrorControllerTarget.java | 12 +- .../connect/mirror/ReferenceNodeStore.java | 41 +--- .../mirror/ReferenceNodeStoreFactory.java | 82 +++++++ .../activemq/artemis/core/server/Queue.java | 3 +- .../artemis/core/server/impl/QueueImpl.java | 9 +- .../core/server/impl/RoutingContextTest.java | 4 +- .../impl/ScheduledDeliveryHandlerTest.java | 4 +- .../amqp/connect/AMQPReplicaTest.java | 221 +++++++++++++++++ tests/smoke-tests/pom.xml | 32 +++ .../mirrored-subscriptions/broker1/broker.xml | 226 ++++++++++++++++++ .../mirrored-subscriptions/broker2/broker.xml | 220 +++++++++++++++++ .../MirroredSubscriptionTest.java | 164 +++++++++++++ .../tests/smoke/common/SimpleManagement.java | 68 ++++++ .../unit/core/postoffice/impl/FakeQueue.java | 4 +- 19 files changed, 1107 insertions(+), 72 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java create mode 100644 tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml create mode 100644 tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml create mode 100644 tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java create mode 100644 tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java index 37f08c35f1..3eebaf4e2c 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.cli.commands; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientMessage; -import org.apache.activemq.artemis.api.core.client.ClientRequestor; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; @@ -28,25 +27,13 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; public abstract class AbstractAction extends ConnectionAbstract { + // TODO: This call could be replaced by a direct call into ManagementHelpr.doManagement and their lambdas public void performCoreManagement(ManagementCallback cb) throws Exception { - try (ActiveMQConnectionFactory factory = createCoreConnectionFactory(); ServerLocator locator = factory.getServerLocator(); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) { - session.start(); - ClientRequestor requestor = new ClientRequestor(session, "activemq.management"); - ClientMessage message = session.createMessage(false); - - cb.setUpInvocation(message); - - ClientMessage reply = requestor.request(message); - - if (ManagementHelper.hasOperationSucceeded(reply)) { - cb.requestSuccessful(reply); - } else { - cb.requestFailed(reply); - } + ManagementHelper.doManagement(session, cb::setUpInvocation, cb::requestSuccessful, cb::requestFailed); } } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java new file mode 100644 index 0000000000..2bd6c9c292 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +public interface NodeStoreFactory { + NodeStore newNodeStore(); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index 678df28625..69a16a8f4b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -16,6 +16,13 @@ */ package org.apache.activemq.artemis.api.core.management; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientRequestor; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.json.JsonArray; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -87,6 +94,36 @@ public final class ManagementHelper { public static final SimpleString HDR_CLIENT_ID = new SimpleString("_AMQ_Client_ID"); + // Lambda declaration for management function. Pretty much same thing as java.util.function.Consumer but with an exception in the declaration that was needed. + public interface MessageAcceptor { + void accept(ClientMessage message) throws Exception; + } + + /** Utility function to connect to a server and perform a management operation via core. */ + public static void doManagement(String uri, String user, String password, MessageAcceptor setup, MessageAcceptor ok, MessageAcceptor failed) throws Exception { + try (ServerLocator locator = ServerLocatorImpl.newLocator(uri); + ClientSessionFactory sessionFactory = locator.createSessionFactory(); + ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) { + doManagement(session, setup, ok, failed); + } + } + + /** Utility function to reuse a ClientSessionConnection and perform a single management operation via core. */ + public static void doManagement(ClientSession session, MessageAcceptor setup, MessageAcceptor ok, MessageAcceptor failed) throws Exception { + session.start(); + ClientRequestor requestor = new ClientRequestor(session, "activemq.management"); + ClientMessage message = session.createMessage(false); + + setup.accept(message); + + ClientMessage reply = requestor.request(message); + + if (ManagementHelper.hasOperationSucceeded(reply)) { + ok.accept(reply); + } else { + failed.accept(reply); + } + } /** * Stores a resource attribute in a message to retrieve the value from the server resource. diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index a34184ae6b..9355bbdff1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; -import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStore; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStoreFactory; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler; @@ -75,7 +75,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager im final Queue snfQueue; final ActiveMQServer server; - final ReferenceNodeStore idSupplier; + final ReferenceNodeStoreFactory idSupplier; final boolean acks; final boolean addQueues; final boolean deleteQueues; @@ -324,14 +324,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im } } - public static void validateProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { + public static void validateProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) { setProtocolData(referenceIDSupplier, ref); } } /** This method will return the brokerID used by the message */ - private static String setProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref) { + private static String setProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref) { String brokerID = referenceIDSupplier.getServerID(ref); long id = referenceIDSupplier.getID(ref); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 55f35bf645..fa168005a0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -162,7 +162,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement DuplicateIDCache lruduplicateIDCache; String lruDuplicateIDKey; - private final ReferenceNodeStore referenceNodeStore; + private final ReferenceNodeStoreFactory referenceNodeStore; OperationContext mirrorContext; @@ -367,15 +367,17 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) { - MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); if (logger.isTraceEnabled()) { - logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}). Ref={}", nodeID, messageID, targetQueue.getName(), reference); + logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", nodeID, messageID, targetQueue.getName()); } + MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); + + if (reference == null) { if (logger.isDebugEnabled()) { - logger.debug("Retrying Reference not found on messageID={}, nodeID={}, currentRetry={}", messageID, nodeID, retry); + logger.debug("Retrying Reference not found on messageID={}, nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry); } switch (retry) { case 0: @@ -404,7 +406,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement if (reference != null) { if (logger.isTraceEnabled()) { - logger.trace("Post ack Server {} worked well for messageID={} nodeID={}", server, messageID, nodeID); + logger.trace("Post ack Server {} worked well for messageID={} nodeID={} queue={}, targetQueue={}", server, messageID, nodeID, reference.getQueue(), targetQueue); } try { switch (reason) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java index d82560cace..a9ae71a273 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java @@ -20,20 +20,16 @@ import java.util.HashMap; import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; -import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; -import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; - public class ReferenceNodeStore implements NodeStore { - private final String serverID; + private final ReferenceNodeStoreFactory factory; - public ReferenceNodeStore(ActiveMQServer server) { - this.serverID = server.getNodeID().toString(); + public ReferenceNodeStore(ReferenceNodeStoreFactory factory) { + this.factory = factory; } // This is where the messages are stored by server id... @@ -43,10 +39,6 @@ public class ReferenceNodeStore implements NodeStore { LongObjectHashMap> lruMap; - public String getDefaultNodeID() { - return serverID; - } - @Override public void storeNode(MessageReference element, LinkedListImpl.Node node) { String list = getServerID(element); @@ -90,7 +82,7 @@ public class ReferenceNodeStore implements NodeStore { /** notice getMap should always return an instance. It should never return null. */ private synchronized LongObjectHashMap> getMap(String serverID) { if (serverID == null) { - serverID = this.serverID; // returning for the localList in case it's null + serverID = factory.getDefaultNodeID(); } if (lruListID != null && lruListID.equals(serverID)) { @@ -113,34 +105,15 @@ public class ReferenceNodeStore implements NodeStore { } public String getServerID(MessageReference element) { - return getServerID(element.getMessage()); + return factory.getServerID(element); } - public String getServerID(Message message) { - Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY); - if (nodeID != null) { - return nodeID.toString(); - } else { - // it is important to return null here, as the MirrorSource is expecting it to be null - // in the case the nodeID being from the originating server. - // don't be tempted to return this.serverID here. - return null; - } + return factory.getServerID(message); } public long getID(MessageReference element) { - Message message = element.getMessage(); - Long id = getID(message); - if (id == null) { - return element.getMessageID(); - } else { - return id; - } - } - - private Long getID(Message message) { - return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY); + return factory.getID(element); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java new file mode 100644 index 0000000000..2782f85175 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.utils.collections.NodeStore; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; + +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; + +public class ReferenceNodeStoreFactory implements NodeStoreFactory { + + final ActiveMQServer server; + + private final String serverID; + + public ReferenceNodeStoreFactory(ActiveMQServer server) { + this.server = server; + this.serverID = server.getNodeID().toString(); + + } + + @Override + public NodeStore newNodeStore() { + return new ReferenceNodeStore(this); + } + + public String getDefaultNodeID() { + return serverID; + } + + public String getServerID(MessageReference element) { + return getServerID(element.getMessage()); + } + + + public String getServerID(Message message) { + Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY); + if (nodeID != null) { + return nodeID.toString(); + } else { + // it is important to return null here, as the MirrorSource is expecting it to be null + // in the case the nodeID being from the originating server. + // don't be tempted to return this.serverID here. + return null; + } + } + + public long getID(MessageReference element) { + Message message = element.getMessage(); + Long id = getID(message); + if (id == null) { + return element.getMessageID(); + } else { + return id; + } + } + + private Long getID(Message message) { + return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY); + } + + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 2c1319bcd4..e9042a9913 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.critical.CriticalComponent; public interface Queue extends Bindable,CriticalComponent { @@ -77,7 +78,7 @@ public interface Queue extends Bindable,CriticalComponent { * If the idSupplier returns {@literal < 0} the ID is considered a non value (null) and it will be ignored. * * @see org.apache.activemq.artemis.utils.collections.LinkedList#setNodeStore(NodeStore) */ - MessageReference removeWithSuppliedID(String serverID, long id, NodeStore nodeStore); + MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory nodeStore); /** * The queue definition could be durable, but the messages could eventually be considered non durable. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2ff193595b..9f948355d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -116,6 +116,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -219,9 +220,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private NodeStore nodeStore; - private void checkIDSupplier(NodeStore nodeStore) { - if (this.nodeStore != nodeStore) { - this.nodeStore = nodeStore; + private void checkIDSupplier(NodeStoreFactory nodeStoreFactory) { + if (this.nodeStore == null) { + this.nodeStore = nodeStoreFactory.newNodeStore(); messageReferences.setNodeStore(nodeStore); } } @@ -3457,7 +3458,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public synchronized MessageReference removeWithSuppliedID(String serverID, long id, NodeStore nodeStore) { + public synchronized MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory nodeStore) { checkIDSupplier(nodeStore); MessageReference reference = messageReferences.removeWithID(serverID, id); if (reference != null) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java index 90cd723c48..09c96e8745 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java @@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; -import org.apache.activemq.artemis.utils.collections.NodeStore; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.apache.activemq.artemis.utils.critical.CriticalCloseable; import org.junit.Assert; @@ -157,7 +157,7 @@ public class RoutingContextTest { } @Override - public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore nodeStore) { + public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory nodeStore) { return null; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 5e43510f8c..e4cd2acbe6 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -56,8 +56,8 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.UUID; -import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.slf4j.Logger; @@ -868,7 +868,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore nodeStore) { + public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory nodeStore) { return null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index 841c8ec1a6..66fafbc3ca 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -24,10 +24,16 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.Topic; +import java.lang.invoke.MethodHandles; import java.net.URI; import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -59,9 +65,13 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQPReplicaTest extends AmqpClientTestSupport { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected static final int AMQP_PORT_2 = 5673; protected static final int AMQP_PORT_3 = 5674; public static final int TIME_BEFORE_RESTART = 1000; @@ -834,6 +844,8 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { } public Queue locateQueue(ActiveMQServer server, String queueName) throws Exception { + Assert.assertNotNull(queueName); + Assert.assertNotNull(server); Wait.waitFor(() -> server.locateQueue(queueName) != null); return server.locateQueue(queueName); } @@ -1077,4 +1089,213 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { conn.close(); } + private void consumeSubscription(int START_ID, + int LAST_ID, + int port, + String clientID, + String queueName, + String subscriptionName, + boolean assertNull) throws JMSException { + ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port); + Connection conn = cf.createConnection(); + conn.setClientID(clientID); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + + HashSet idsReceived = new HashSet<>(); + + Topic topic = sess.createTopic(queueName); + + MessageConsumer consumer = sess.createDurableConsumer(topic, subscriptionName); + for (int i = START_ID; i <= LAST_ID; i++) { + Message message = consumer.receive(3000); + Assert.assertNotNull(message); + Integer id = message.getIntProperty("i"); + Assert.assertNotNull(id); + Assert.assertTrue(idsReceived.add(id)); + } + + if (assertNull) { + Assert.assertNull(consumer.receiveNoWait()); + } + + for (int i = START_ID; i <= LAST_ID; i++) { + Assert.assertTrue(idsReceived.remove(i)); + } + + Assert.assertTrue(idsReceived.isEmpty()); + conn.close(); + } + + @Test + public void testMulticast() throws Exception { + multiCastReplicaTest(false, false, false, false, true); + } + + + @Test + public void testMulticastSerializeConsumption() throws Exception { + multiCastReplicaTest(false, false, false, false, false); + } + + @Test + public void testMulticastTargetPaging() throws Exception { + multiCastReplicaTest(false, true, false, false, true); + } + + @Test + public void testMulticastTargetSourcePaging() throws Exception { + multiCastReplicaTest(false, true, true, true, true); + } + + @Test + public void testMulticastTargetLargeMessage() throws Exception { + multiCastReplicaTest(true, true, true, true, true); + } + + + private void multiCastReplicaTest(boolean largeMessage, + boolean pagingTarget, + boolean pagingSource, + boolean restartBrokerConnection, boolean multiThreadConsumers) throws Exception { + + String brokerConnectionName = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID(); + final ActiveMQServer server = this.server; + server.setIdentity("targetServer"); + + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + server_2.getConfiguration().setName("thisone"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true); + replica.setName("theReplica"); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + server_2.getConfiguration().setName("server_2"); + + int NUMBER_OF_MESSAGES = 200; + + server_2.start(); + server.start(); + Wait.assertTrue(server_2::isStarted); + Wait.assertTrue(server::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo(getTopicName()).addRoutingType(RoutingType.MULTICAST).setAutoCreated(false)); + + + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + MessageProducer producer = session.createProducer(topic); + + for (int i = 0; i <= 1; i++) { + // just creating the subscription and not consuming anything + consumeSubscription(0, -1, AMQP_PORT_2, "client" + i, getTopicName(), "subscription" + i, false); + } + + String subs0Name = "client0.subscription0"; + String subs1Name = "client1.subscription1"; + + + Queue subs0Server1 = locateQueue(server, subs0Name); + Queue subs1Server1 = locateQueue(server, subs1Name); + Assert.assertNotNull(subs0Server1); + Assert.assertNotNull(subs1Server1); + + Queue subs0Server2 = locateQueue(server_2, subs0Name); + Queue subs1Server2 = locateQueue(server_2, subs1Name); + Assert.assertNotNull(subs0Server2); + Assert.assertNotNull(subs1Server2); + + if (pagingTarget) { + subs0Server1.getPagingStore().startPaging(); + } + + if (pagingSource) { + subs0Server2.getPagingStore().startPaging(); + } + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + Message message = session.createTextMessage(getText(largeMessage, i)); + message.setIntProperty("i", i); + producer.send(message); + } + + if (pagingTarget) { + subs0Server1.getPagingStore().startPaging(); + } + + Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF()); + + Assert.assertNotNull(snfreplica); + + Wait.assertEquals(0, snfreplica::getMessageCount); + + Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server1::getMessageCount, 2000); + Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server1::getMessageCount, 2000); + + Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server2::getMessageCount, 2000); + Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server2::getMessageCount, 2000); + + if (restartBrokerConnection) { + // stop and start the broker connection, making sure we wouldn't duplicate the mirror + server_2.stopBrokerConnection(brokerConnectionName); + Thread.sleep(1000); + server_2.startBrokerConnection(brokerConnectionName); + } + + Assert.assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF())); + + if (pagingTarget) { + assertTrue(subs0Server1.getPagingStore().isPaging()); + assertTrue(subs1Server1.getPagingStore().isPaging()); + } + + ExecutorService executorService = Executors.newFixedThreadPool(2); + runAfter(executorService::shutdownNow); + + CountDownLatch done = new CountDownLatch(2); + AtomicInteger errors = new AtomicInteger(0); + + for (int i = 0; i <= 1; i++) { + CountDownLatch threadDone = new CountDownLatch(1); + int subscriptionID = i; + executorService.execute(() -> { + try { + consumeSubscription(0, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, "client" + subscriptionID, getTopicName(), "subscription" + subscriptionID, false); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + done.countDown(); + threadDone.countDown(); + } + }); + if (!multiThreadConsumers) { + threadDone.await(1, TimeUnit.MINUTES); + } + } + + Assert.assertTrue(done.await(60, TimeUnit.SECONDS)); + Assert.assertEquals(0, errors.get()); + + // Replica is async, so we need to wait acks to arrive before we finish consuming there + Wait.assertEquals(0, snfreplica::getMessageCount); + Wait.assertEquals(0L, subs0Server1::getMessageCount, 2000, 100); + Wait.assertEquals(0L, subs1Server1::getMessageCount, 2000, 100); + Wait.assertEquals(0L, subs0Server2::getMessageCount, 2000, 100); + Wait.assertEquals(0L, subs1Server2::getMessageCount, 2000, 100); + + + if (largeMessage) { + validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 0); + validateNoFilesOnLargeDir(server_2.getConfiguration().getLargeMessagesDirectory(), 0); + } + } + + + } diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 79990fa897..680fc2497a 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -1366,6 +1366,38 @@ + + test-compile + create-test-Mirror1 + + create + + + amq + admin + admin + true + true + ${basedir}/target/mirrored-subscriptions/broker1 + ${basedir}/target/classes/servers/mirrored-subscriptions/broker1 + + + + test-compile + create-test-Mirror2 + + create + + + amq + admin + admin + true + true + ${basedir}/target/mirrored-subscriptions/broker2 + ${basedir}/target/classes/servers/mirrored-subscriptions/broker2 + + diff --git a/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml new file mode 100644 index 0000000000..763558fec9 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml @@ -0,0 +1,226 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + false + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300 + + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + 100K + 1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ + + + + +
+
+ +
+
+ +
+ +
+ +
+
diff --git a/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml new file mode 100644 index 0000000000..33c8a7ca3a --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml @@ -0,0 +1,220 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + false + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + + + + + + + + + tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300 + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + 100K + 1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ + + + + +
+
+ +
+
+ +
+ +
+ +
+
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java new file mode 100644 index 0000000000..69242281a9 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.brokerConnection; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import java.lang.invoke.MethodHandles; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SimpleManagement; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MirroredSubscriptionTest extends SmokeTestBase { + + public static final String SERVER_NAME_A = "mirrored-subscriptions/broker1"; + public static final String SERVER_NAME_B = "mirrored-subscriptions/broker2"; + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + // Change this to true to generate a print-data in certain cases on this test + private static final boolean PRINT_DATA = false; + private static final String JMX_SERVER_HOSTNAME = "localhost"; + private static final int JMX_SERVER_PORT = 11099; + + Process processB; + Process processA; + + @Before + public void beforeClass() throws Exception { + cleanupData(SERVER_NAME_A); + cleanupData(SERVER_NAME_B); + processB = startServer(SERVER_NAME_B, 1, 0); + processA = startServer(SERVER_NAME_A, 0, 0); + + ServerUtil.waitForServerToStart(1, "B", "B", 30000); + ServerUtil.waitForServerToStart(0, "A", "A", 30000); + } + + @Test + public void testSend() throws Throwable { + + int COMMIT_INTERVAL = 100; + int NUMBER_OF_MESSAGES = 500; + int CLIENTS = 2; + String mainURI = "tcp://localhost:61616"; + String secondURI = "tcp://localhost:61617"; + + String topicName = "myTopic"; + + ConnectionFactory cf = CFUtil.createConnectionFactory("amqp", mainURI); + + for (int i = 0; i < CLIENTS; i++) { + try (Connection connection = cf.createConnection()) { + connection.setClientID("client" + i); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(topicName); + session.createDurableSubscriber(topic, "subscription" + i); + } + } + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage("hello " + i)); + if (i % COMMIT_INTERVAL == 0) { + session.commit(); + } + } + session.commit(); + } + + Map result = SimpleManagement.listQueues(mainURI, null, null, 100); + result.entrySet().forEach(entry -> System.out.println("Queue " + entry.getKey() + "=" + entry.getValue())); + + checkMessages(NUMBER_OF_MESSAGES, CLIENTS, mainURI, secondURI); + + ExecutorService executorService = Executors.newFixedThreadPool(CLIENTS); + runAfter(executorService::shutdownNow); + + CountDownLatch done = new CountDownLatch(CLIENTS); + AtomicInteger errors = new AtomicInteger(0); + + for (int i = 0; i < CLIENTS; i++) { + final int clientID = i; + executorService.execute(() -> { + try (Connection connection = cf.createConnection()) { + connection.setClientID("client" + clientID); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(topicName); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subscription" + clientID); + for (int messageI = 0; messageI < NUMBER_OF_MESSAGES; messageI++) { + TextMessage message = (TextMessage) subscriber.receive(5000); + Assert.assertNotNull(message); + if (messageI % COMMIT_INTERVAL == 0) { + session.commit(); + } + } + session.commit(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + done.countDown(); + } + }); + } + + Assert.assertTrue(done.await(300, TimeUnit.SECONDS)); + Assert.assertEquals(0, errors.get()); + checkMessages(0, CLIENTS, mainURI, secondURI); + } + + private void checkMessages(int NUMBER_OF_MESSAGES, int CLIENTS, String mainURI, String secondURI) throws Exception { + for (int i = 0; i < CLIENTS; i++) { + final int clientID = i; + Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(mainURI, "client" + clientID + ".subscription" + clientID)); + Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(secondURI, "client" + clientID + ".subscription" + clientID)); + } + } + + int getMessageCount(String uri, String queueName) throws Exception { + Map result = SimpleManagement.listQueues(uri, null, null, 100); + Integer resultReturn = result.get(queueName); + + logger.debug("Result = {}, queueName={}, returnValue = {}", result, queueName, resultReturn); + return resultReturn == null ? 0 : resultReturn; + + } + +} diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java new file mode 100644 index 0000000000..5b9a34b2df --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.common; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.json.JsonArray; +import org.apache.activemq.artemis.json.JsonObject; + +public class SimpleManagement { + + private static final String SIMPLE_OPTIONS = "{\"field\":\"\",\"value\":\"\",\"operation\":\"\"}"; + + /** Simple management function that will return a list of Pair */ + public static Map listQueues(String uri, String user, String password, int maxRows) throws Exception { + Map queues = new HashMap<>(); + ManagementHelper.doManagement(uri, user, password, t -> setupListQueue(t, maxRows), t -> listQueueResult(t, queues), SimpleManagement::failed); + return queues; + } + + private static void setupListQueue(ClientMessage m, int maxRows) throws Exception { + ManagementHelper.putOperationInvocation(m, "broker", "listQueues", SIMPLE_OPTIONS, 1, maxRows); + } + + private static void listQueueResult(ClientMessage message, Map mapQueues) throws Exception { + + final String result = (String) ManagementHelper.getResult(message, String.class); + + + JsonObject queuesAsJsonObject = JsonUtil.readJsonObject(result); + JsonArray array = queuesAsJsonObject.getJsonArray("data"); + + for (int i = 0; i < array.size(); i++) { + JsonObject object = array.getJsonObject(i); + String name = object.getString("name"); + String messageCount = object.getString("messageCount"); + mapQueues.put(name, Integer.parseInt(messageCount)); + } + + } + + private static void failed(ClientMessage message) throws Exception { + + final String result = (String) ManagementHelper.getResult(message, String.class); + + throw new Exception("Failed " + result); + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index bb779ac2a0..cd30f4b054 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -40,8 +40,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ReferenceCounter; -import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; @@ -143,7 +143,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore nodeStore) { + public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory nodeStore) { return null; }