ARTEMIS-4366 Missing Mirrored ACKs with MULTICAST and subscriptions

This commit is contained in:
Clebert Suconic 2023-07-14 16:49:31 -04:00 committed by clebertsuconic
parent 350ede619d
commit 677d71b8e7
19 changed files with 1107 additions and 72 deletions

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.cli.commands;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
@ -28,25 +27,13 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
public abstract class AbstractAction extends ConnectionAbstract {
// TODO: This call could be replaced by a direct call into ManagementHelpr.doManagement and their lambdas
public void performCoreManagement(ManagementCallback<ClientMessage> cb) throws Exception {
try (ActiveMQConnectionFactory factory = createCoreConnectionFactory();
ServerLocator locator = factory.getServerLocator();
ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
session.start();
ClientRequestor requestor = new ClientRequestor(session, "activemq.management");
ClientMessage message = session.createMessage(false);
cb.setUpInvocation(message);
ClientMessage reply = requestor.request(message);
if (ManagementHelper.hasOperationSucceeded(reply)) {
cb.requestSuccessful(reply);
} else {
cb.requestFailed(reply);
}
ManagementHelper.doManagement(session, cb::setUpInvocation, cb::requestSuccessful, cb::requestFailed);
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
public interface NodeStoreFactory<E> {
NodeStore<E> newNodeStore();
}

View File

@ -16,6 +16,13 @@
*/
package org.apache.activemq.artemis.api.core.management;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -87,6 +94,36 @@ public final class ManagementHelper {
public static final SimpleString HDR_CLIENT_ID = new SimpleString("_AMQ_Client_ID");
// Lambda declaration for management function. Pretty much same thing as java.util.function.Consumer but with an exception in the declaration that was needed.
public interface MessageAcceptor {
void accept(ClientMessage message) throws Exception;
}
/** Utility function to connect to a server and perform a management operation via core. */
public static void doManagement(String uri, String user, String password, MessageAcceptor setup, MessageAcceptor ok, MessageAcceptor failed) throws Exception {
try (ServerLocator locator = ServerLocatorImpl.newLocator(uri);
ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
doManagement(session, setup, ok, failed);
}
}
/** Utility function to reuse a ClientSessionConnection and perform a single management operation via core. */
public static void doManagement(ClientSession session, MessageAcceptor setup, MessageAcceptor ok, MessageAcceptor failed) throws Exception {
session.start();
ClientRequestor requestor = new ClientRequestor(session, "activemq.management");
ClientMessage message = session.createMessage(false);
setup.accept(message);
ClientMessage reply = requestor.request(message);
if (ManagementHelper.hasOperationSucceeded(reply)) {
ok.accept(reply);
} else {
failed.accept(reply);
}
}
/**
* Stores a resource attribute in a message to retrieve the value from the server resource.

View File

@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStore;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStoreFactory;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
@ -75,7 +75,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
// We must use one referenceIDSupplier per server.
// protocol manager is the perfect aggregation for that.
private ReferenceNodeStore referenceIDSupplier;
private ReferenceNodeStoreFactory referenceIDSupplier;
private final ProtonProtocolManagerFactory factory;
@ -125,11 +125,11 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
routingHandler = new AMQPRoutingHandler(server);
}
public synchronized ReferenceNodeStore getReferenceIDSupplier() {
public synchronized ReferenceNodeStoreFactory getReferenceIDSupplier() {
if (referenceIDSupplier == null) {
// we lazy start the instance.
// only create it when needed
referenceIDSupplier = new ReferenceNodeStore(server);
referenceIDSupplier = new ReferenceNodeStoreFactory(server);
}
return referenceIDSupplier;
}

View File

@ -92,7 +92,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
final Queue snfQueue;
final ActiveMQServer server;
final ReferenceNodeStore idSupplier;
final ReferenceNodeStoreFactory idSupplier;
final boolean acks;
final boolean addQueues;
final boolean deleteQueues;
@ -324,14 +324,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
}
}
public static void validateProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
public static void validateProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
setProtocolData(referenceIDSupplier, ref);
}
}
/** This method will return the brokerID used by the message */
private static String setProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref) {
private static String setProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref) {
String brokerID = referenceIDSupplier.getServerID(ref);
long id = referenceIDSupplier.getID(ref);

View File

@ -162,7 +162,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
DuplicateIDCache lruduplicateIDCache;
String lruDuplicateIDKey;
private final ReferenceNodeStore referenceNodeStore;
private final ReferenceNodeStoreFactory referenceNodeStore;
OperationContext mirrorContext;
@ -367,15 +367,17 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}). Ref={}", nodeID, messageID, targetQueue.getName(), reference);
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", nodeID, messageID, targetQueue.getName());
}
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (reference == null) {
if (logger.isDebugEnabled()) {
logger.debug("Retrying Reference not found on messageID={}, nodeID={}, currentRetry={}", messageID, nodeID, retry);
logger.debug("Retrying Reference not found on messageID={}, nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry);
}
switch (retry) {
case 0:
@ -404,7 +406,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
if (reference != null) {
if (logger.isTraceEnabled()) {
logger.trace("Post ack Server {} worked well for messageID={} nodeID={}", server, messageID, nodeID);
logger.trace("Post ack Server {} worked well for messageID={} nodeID={} queue={}, targetQueue={}", server, messageID, nodeID, reference.getQueue(), targetQueue);
}
try {
switch (reason) {

View File

@ -20,20 +20,16 @@ import java.util.HashMap;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
public class ReferenceNodeStore implements NodeStore<MessageReference> {
private final String serverID;
private final ReferenceNodeStoreFactory factory;
public ReferenceNodeStore(ActiveMQServer server) {
this.serverID = server.getNodeID().toString();
public ReferenceNodeStore(ReferenceNodeStoreFactory factory) {
this.factory = factory;
}
// This is where the messages are stored by server id...
@ -43,10 +39,6 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> lruMap;
public String getDefaultNodeID() {
return serverID;
}
@Override
public void storeNode(MessageReference element, LinkedListImpl.Node<MessageReference> node) {
String list = getServerID(element);
@ -90,7 +82,7 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
/** notice getMap should always return an instance. It should never return null. */
private synchronized LongObjectHashMap<LinkedListImpl.Node<MessageReference>> getMap(String serverID) {
if (serverID == null) {
serverID = this.serverID; // returning for the localList in case it's null
serverID = factory.getDefaultNodeID();
}
if (lruListID != null && lruListID.equals(serverID)) {
@ -113,34 +105,15 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
}
public String getServerID(MessageReference element) {
return getServerID(element.getMessage());
return factory.getServerID(element);
}
public String getServerID(Message message) {
Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
if (nodeID != null) {
return nodeID.toString();
} else {
// it is important to return null here, as the MirrorSource is expecting it to be null
// in the case the nodeID being from the originating server.
// don't be tempted to return this.serverID here.
return null;
}
return factory.getServerID(message);
}
public long getID(MessageReference element) {
Message message = element.getMessage();
Long id = getID(message);
if (id == null) {
return element.getMessageID();
} else {
return id;
}
}
private Long getID(Message message) {
return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
return factory.getID(element);
}
@Override

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageReference> {
final ActiveMQServer server;
private final String serverID;
public ReferenceNodeStoreFactory(ActiveMQServer server) {
this.server = server;
this.serverID = server.getNodeID().toString();
}
@Override
public NodeStore<MessageReference> newNodeStore() {
return new ReferenceNodeStore(this);
}
public String getDefaultNodeID() {
return serverID;
}
public String getServerID(MessageReference element) {
return getServerID(element.getMessage());
}
public String getServerID(Message message) {
Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
if (nodeID != null) {
return nodeID.toString();
} else {
// it is important to return null here, as the MirrorSource is expecting it to be null
// in the case the nodeID being from the originating server.
// don't be tempted to return this.serverID here.
return null;
}
}
public long getID(MessageReference element) {
Message message = element.getMessage();
Long id = getID(message);
if (id == null) {
return element.getMessageID();
} else {
return id;
}
}
private Long getID(Message message) {
return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
}
}

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.critical.CriticalComponent;
public interface Queue extends Bindable,CriticalComponent {
@ -77,7 +78,7 @@ public interface Queue extends Bindable,CriticalComponent {
* If the idSupplier returns {@literal < 0} the ID is considered a non value (null) and it will be ignored.
*
* @see org.apache.activemq.artemis.utils.collections.LinkedList#setNodeStore(NodeStore) */
MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore);
MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore);
/**
* The queue definition could be durable, but the messages could eventually be considered non durable.

View File

@ -116,6 +116,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -219,9 +220,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private NodeStore<MessageReference> nodeStore;
private void checkIDSupplier(NodeStore<MessageReference> nodeStore) {
if (this.nodeStore != nodeStore) {
this.nodeStore = nodeStore;
private void checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory) {
if (this.nodeStore == null) {
this.nodeStore = nodeStoreFactory.newNodeStore();
messageReferences.setNodeStore(nodeStore);
}
}
@ -3457,7 +3458,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public synchronized MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) {
public synchronized MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) {
checkIDSupplier(nodeStore);
MessageReference reference = messageReferences.removeWithID(serverID, id);
if (reference != null) {

View File

@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalCloseable;
import org.junit.Assert;
@ -157,7 +157,7 @@ public class RoutingContextTest {
}
@Override
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) {
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) {
return null;
}

View File

@ -56,8 +56,8 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.slf4j.Logger;
@ -868,7 +868,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) {
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) {
return null;
}

View File

@ -24,10 +24,16 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -59,9 +65,13 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQPReplicaTest extends AmqpClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected static final int AMQP_PORT_2 = 5673;
protected static final int AMQP_PORT_3 = 5674;
public static final int TIME_BEFORE_RESTART = 1000;
@ -834,6 +844,8 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
public Queue locateQueue(ActiveMQServer server, String queueName) throws Exception {
Assert.assertNotNull(queueName);
Assert.assertNotNull(server);
Wait.waitFor(() -> server.locateQueue(queueName) != null);
return server.locateQueue(queueName);
}
@ -1077,4 +1089,213 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
conn.close();
}
private void consumeSubscription(int START_ID,
int LAST_ID,
int port,
String clientID,
String queueName,
String subscriptionName,
boolean assertNull) throws JMSException {
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port);
Connection conn = cf.createConnection();
conn.setClientID(clientID);
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
HashSet<Integer> idsReceived = new HashSet<>();
Topic topic = sess.createTopic(queueName);
MessageConsumer consumer = sess.createDurableConsumer(topic, subscriptionName);
for (int i = START_ID; i <= LAST_ID; i++) {
Message message = consumer.receive(3000);
Assert.assertNotNull(message);
Integer id = message.getIntProperty("i");
Assert.assertNotNull(id);
Assert.assertTrue(idsReceived.add(id));
}
if (assertNull) {
Assert.assertNull(consumer.receiveNoWait());
}
for (int i = START_ID; i <= LAST_ID; i++) {
Assert.assertTrue(idsReceived.remove(i));
}
Assert.assertTrue(idsReceived.isEmpty());
conn.close();
}
@Test
public void testMulticast() throws Exception {
multiCastReplicaTest(false, false, false, false, true);
}
@Test
public void testMulticastSerializeConsumption() throws Exception {
multiCastReplicaTest(false, false, false, false, false);
}
@Test
public void testMulticastTargetPaging() throws Exception {
multiCastReplicaTest(false, true, false, false, true);
}
@Test
public void testMulticastTargetSourcePaging() throws Exception {
multiCastReplicaTest(false, true, true, true, true);
}
@Test
public void testMulticastTargetLargeMessage() throws Exception {
multiCastReplicaTest(true, true, true, true, true);
}
private void multiCastReplicaTest(boolean largeMessage,
boolean pagingTarget,
boolean pagingSource,
boolean restartBrokerConnection, boolean multiThreadConsumers) throws Exception {
String brokerConnectionName = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID();
final ActiveMQServer server = this.server;
server.setIdentity("targetServer");
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("server_2");
server_2.getConfiguration().setName("thisone");
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
replica.setName("theReplica");
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.getConfiguration().setName("server_2");
int NUMBER_OF_MESSAGES = 200;
server_2.start();
server.start();
Wait.assertTrue(server_2::isStarted);
Wait.assertTrue(server::isStarted);
// We create the address to avoid auto delete on the queue
server_2.addAddressInfo(new AddressInfo(getTopicName()).addRoutingType(RoutingType.MULTICAST).setAutoCreated(false));
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i <= 1; i++) {
// just creating the subscription and not consuming anything
consumeSubscription(0, -1, AMQP_PORT_2, "client" + i, getTopicName(), "subscription" + i, false);
}
String subs0Name = "client0.subscription0";
String subs1Name = "client1.subscription1";
Queue subs0Server1 = locateQueue(server, subs0Name);
Queue subs1Server1 = locateQueue(server, subs1Name);
Assert.assertNotNull(subs0Server1);
Assert.assertNotNull(subs1Server1);
Queue subs0Server2 = locateQueue(server_2, subs0Name);
Queue subs1Server2 = locateQueue(server_2, subs1Name);
Assert.assertNotNull(subs0Server2);
Assert.assertNotNull(subs1Server2);
if (pagingTarget) {
subs0Server1.getPagingStore().startPaging();
}
if (pagingSource) {
subs0Server2.getPagingStore().startPaging();
}
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage(getText(largeMessage, i));
message.setIntProperty("i", i);
producer.send(message);
}
if (pagingTarget) {
subs0Server1.getPagingStore().startPaging();
}
Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF());
Assert.assertNotNull(snfreplica);
Wait.assertEquals(0, snfreplica::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server1::getMessageCount, 2000);
Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server1::getMessageCount, 2000);
Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server2::getMessageCount, 2000);
Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server2::getMessageCount, 2000);
if (restartBrokerConnection) {
// stop and start the broker connection, making sure we wouldn't duplicate the mirror
server_2.stopBrokerConnection(brokerConnectionName);
Thread.sleep(1000);
server_2.startBrokerConnection(brokerConnectionName);
}
Assert.assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF()));
if (pagingTarget) {
assertTrue(subs0Server1.getPagingStore().isPaging());
assertTrue(subs1Server1.getPagingStore().isPaging());
}
ExecutorService executorService = Executors.newFixedThreadPool(2);
runAfter(executorService::shutdownNow);
CountDownLatch done = new CountDownLatch(2);
AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i <= 1; i++) {
CountDownLatch threadDone = new CountDownLatch(1);
int subscriptionID = i;
executorService.execute(() -> {
try {
consumeSubscription(0, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, "client" + subscriptionID, getTopicName(), "subscription" + subscriptionID, false);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
done.countDown();
threadDone.countDown();
}
});
if (!multiThreadConsumers) {
threadDone.await(1, TimeUnit.MINUTES);
}
}
Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());
// Replica is async, so we need to wait acks to arrive before we finish consuming there
Wait.assertEquals(0, snfreplica::getMessageCount);
Wait.assertEquals(0L, subs0Server1::getMessageCount, 2000, 100);
Wait.assertEquals(0L, subs1Server1::getMessageCount, 2000, 100);
Wait.assertEquals(0L, subs0Server2::getMessageCount, 2000, 100);
Wait.assertEquals(0L, subs1Server2::getMessageCount, 2000, 100);
if (largeMessage) {
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 0);
validateNoFilesOnLargeDir(server_2.getConfiguration().getLargeMessagesDirectory(), 0);
}
}
}

View File

@ -1366,6 +1366,38 @@
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-test-Mirror1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>admin</user>
<password>admin</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>true</noWeb>
<instance>${basedir}/target/mirrored-subscriptions/broker1</instance>
<configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker1</configuration>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-test-Mirror2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>admin</user>
<password>admin</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>true</noWeb>
<instance>${basedir}/target/mirrored-subscriptions/broker2</instance>
<configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker2</configuration>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>

View File

@ -0,0 +1,226 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<message-expiry-scan-period>1000</message-expiry-scan-period>
<security-enabled>false</security-enabled>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size>
-->
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- useKQueue means: it will use Netty kqueue if you are on a system (MacOS) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
</acceptors>
<broker-connections>
<amqp-connection uri="tcp://localhost:61617" name="mirror" retry-interval="100">
<mirror/>
</amqp-connection>
</broker-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<address-setting match="myQueue">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<default-max-consumers>1</default-max-consumers>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<address-setting match="myTopic">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<default-max-consumers>1</default-max-consumers>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<address-setting match="myTopicPaging">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>100K</max-size-bytes>
<default-max-consumers>1</default-max-consumers>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
<address name="myQueue">
<anycast>
<!-- this should be maxed from the default -->
<queue name="myQueue">
</queue>
</anycast>
</address>
<address name="myTopic">
<multicast/>
</address>
<address name="myTopicPaging">
<multicast/>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,220 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<message-expiry-scan-period>1000</message-expiry-scan-period>
<security-enabled>false</security-enabled>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size>
-->
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- useKQueue means: it will use Netty kqueue if you are on a system (MacOS) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">
tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300
</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<address-setting match="myQueue">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<default-max-consumers>1</default-max-consumers>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<address-setting match="myTopic">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<default-max-consumers>1</default-max-consumers>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<address-setting match="myTopicPaging">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>100K</max-size-bytes>
<default-max-consumers>1</default-max-consumers>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
<address name="myQueue">
<anycast>
<!-- this should be maxed from the default -->
<queue name="myQueue">
</queue>
</anycast>
</address>
<address name="myTopic">
<multicast/>
</address>
<address name="myTopicPaging">
<multicast/>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.smoke.common.SimpleManagement;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MirroredSubscriptionTest extends SmokeTestBase {
public static final String SERVER_NAME_A = "mirrored-subscriptions/broker1";
public static final String SERVER_NAME_B = "mirrored-subscriptions/broker2";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// Change this to true to generate a print-data in certain cases on this test
private static final boolean PRINT_DATA = false;
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT = 11099;
Process processB;
Process processA;
@Before
public void beforeClass() throws Exception {
cleanupData(SERVER_NAME_A);
cleanupData(SERVER_NAME_B);
processB = startServer(SERVER_NAME_B, 1, 0);
processA = startServer(SERVER_NAME_A, 0, 0);
ServerUtil.waitForServerToStart(1, "B", "B", 30000);
ServerUtil.waitForServerToStart(0, "A", "A", 30000);
}
@Test
public void testSend() throws Throwable {
int COMMIT_INTERVAL = 100;
int NUMBER_OF_MESSAGES = 500;
int CLIENTS = 2;
String mainURI = "tcp://localhost:61616";
String secondURI = "tcp://localhost:61617";
String topicName = "myTopic";
ConnectionFactory cf = CFUtil.createConnectionFactory("amqp", mainURI);
for (int i = 0; i < CLIENTS; i++) {
try (Connection connection = cf.createConnection()) {
connection.setClientID("client" + i);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(topicName);
session.createDurableSubscriber(topic, "subscription" + i);
}
}
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("hello " + i));
if (i % COMMIT_INTERVAL == 0) {
session.commit();
}
}
session.commit();
}
Map<String, Integer> result = SimpleManagement.listQueues(mainURI, null, null, 100);
result.entrySet().forEach(entry -> System.out.println("Queue " + entry.getKey() + "=" + entry.getValue()));
checkMessages(NUMBER_OF_MESSAGES, CLIENTS, mainURI, secondURI);
ExecutorService executorService = Executors.newFixedThreadPool(CLIENTS);
runAfter(executorService::shutdownNow);
CountDownLatch done = new CountDownLatch(CLIENTS);
AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i < CLIENTS; i++) {
final int clientID = i;
executorService.execute(() -> {
try (Connection connection = cf.createConnection()) {
connection.setClientID("client" + clientID);
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subscription" + clientID);
for (int messageI = 0; messageI < NUMBER_OF_MESSAGES; messageI++) {
TextMessage message = (TextMessage) subscriber.receive(5000);
Assert.assertNotNull(message);
if (messageI % COMMIT_INTERVAL == 0) {
session.commit();
}
}
session.commit();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
done.countDown();
}
});
}
Assert.assertTrue(done.await(300, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());
checkMessages(0, CLIENTS, mainURI, secondURI);
}
private void checkMessages(int NUMBER_OF_MESSAGES, int CLIENTS, String mainURI, String secondURI) throws Exception {
for (int i = 0; i < CLIENTS; i++) {
final int clientID = i;
Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(mainURI, "client" + clientID + ".subscription" + clientID));
Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(secondURI, "client" + clientID + ".subscription" + clientID));
}
}
int getMessageCount(String uri, String queueName) throws Exception {
Map<String, Integer> result = SimpleManagement.listQueues(uri, null, null, 100);
Integer resultReturn = result.get(queueName);
logger.debug("Result = {}, queueName={}, returnValue = {}", result, queueName, resultReturn);
return resultReturn == null ? 0 : resultReturn;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.common;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
public class SimpleManagement {
private static final String SIMPLE_OPTIONS = "{\"field\":\"\",\"value\":\"\",\"operation\":\"\"}";
/** Simple management function that will return a list of Pair<Name of Queue, Number of Messages> */
public static Map<String, Integer> listQueues(String uri, String user, String password, int maxRows) throws Exception {
Map<String, Integer> queues = new HashMap<>();
ManagementHelper.doManagement(uri, user, password, t -> setupListQueue(t, maxRows), t -> listQueueResult(t, queues), SimpleManagement::failed);
return queues;
}
private static void setupListQueue(ClientMessage m, int maxRows) throws Exception {
ManagementHelper.putOperationInvocation(m, "broker", "listQueues", SIMPLE_OPTIONS, 1, maxRows);
}
private static void listQueueResult(ClientMessage message, Map<String, Integer> mapQueues) throws Exception {
final String result = (String) ManagementHelper.getResult(message, String.class);
JsonObject queuesAsJsonObject = JsonUtil.readJsonObject(result);
JsonArray array = queuesAsJsonObject.getJsonArray("data");
for (int i = 0; i < array.size(); i++) {
JsonObject object = array.getJsonObject(i);
String name = object.getString("name");
String messageCount = object.getString("messageCount");
mapQueues.put(name, Integer.parseInt(messageCount));
}
}
private static void failed(ClientMessage message) throws Exception {
final String result = (String) ManagementHelper.getResult(message, String.class);
throw new Exception("Failed " + result);
}
}

View File

@ -40,8 +40,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
@ -143,7 +143,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) {
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) {
return null;
}