ARTEMIS-4558 Idempotent Mirrored ACKs

Mirror acks should be performed atomically with the storage of the source ACK. Both the send of the ack and the recording of the ack should be part of the same transaction (in case of transactional).

We are also adding support on transactions for an afterWired callback for the proper plug of OperationContext sync.
This commit is contained in:
Clebert Suconic 2024-01-05 16:48:11 -05:00 committed by clebertsuconic
parent 1887b3fb8e
commit bc7e4639e0
7 changed files with 737 additions and 49 deletions

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -65,6 +66,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr");
public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id");
public static final SimpleString BROKER_ID_SIMPLE_STRING = SimpleString.toSimpleString(BROKER_ID.toString());
// Events:
public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress");
@ -154,7 +156,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
if (addQueues) {
Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, null, addressInfo.toJSON());
route(server, message);
routeMirrorCommand(server, message);
}
}
@ -170,7 +172,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
}
if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
route(server, message);
routeMirrorCommand(server, message);
}
}
@ -193,7 +195,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
}
if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
route(server, message);
routeMirrorCommand(server, message);
}
}
@ -213,15 +215,34 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
if (deleteQueues) {
Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
route(server, message);
routeMirrorCommand(server, message);
}
}
private boolean invalidTarget(MirrorController controller, Message message) {
if (controller == null) {
return false;
}
String remoteID = getRemoteMirrorId();
if (remoteID == null) {
// This is to avoid a reflection (Miror sendin messages back to itself) from a small period of time one node reconnects but not the opposite direction.
Object localRemoteID = message.getAnnotation(BROKER_ID_SIMPLE_STRING);
if (localRemoteID != null) {
remoteID = String.valueOf(localRemoteID);
logger.debug("Remote link is not initialized yet, setting remoteID from message as {}", remoteID);
}
}
return sameNode(remoteID, controller.getRemoteMirrorId());
}
private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}
private boolean ignoreAddress(SimpleString address) {
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
return true;
}
return !addressFilter.match(address);
}
@ -238,7 +259,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return;
}
if (invalidTarget(context.getMirrorSource())) {
if (invalidTarget(context.getMirrorSource(), message)) {
logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
return;
}
@ -444,13 +465,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
MirrorACKOperation operation = getAckOperation(tx);
// notice the operationContext.replicationLineUp is done on beforeCommit as part of the TX
operation.addMessage(messageCommand, ref);
routeMirrorCommand(server, messageCommand, tx);
} else {
server.getStorageManager().afterStoreOperations(new IOCallback() {
@Override
public void done() {
try {
logger.debug("preAcknowledge::afterStoreOperation for messageReference {}", ref);
route(server, messageCommand);
routeMirrorCommand(server, messageCommand);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
@ -469,7 +491,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
logger.trace("getAckOperation::setting operation on transaction {}", tx);
ackOperation = new MirrorACKOperation(server);
tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, ackOperation);
tx.afterStore(ackOperation);
tx.afterWired(ackOperation);
}
return ackOperation;
@ -490,7 +512,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return sendOperation;
}
private static class MirrorACKOperation extends TransactionOperationAbstract {
private static class MirrorACKOperation implements Runnable {
final ActiveMQServer server;
@ -511,47 +533,19 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
}
@Override
public void beforeCommit(Transaction tx) {
logger.debug("MirrorACKOperation::beforeCommit processing {}", acks);
acks.forEach(this::doBeforeCommit);
public void run() {
logger.debug("MirrorACKOperation::wired processing {}", acks);
acks.forEach(this::doWired);
}
// callback to be used on forEach
private void doBeforeCommit(Message ack, MessageReference ref) {
private void doWired(Message ack, MessageReference ref) {
OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class);
if (context != null) {
context.replicationLineUp();
}
}
@Override
public void afterCommit(Transaction tx) {
logger.debug("MirrorACKOperation::afterCommit processing {}", acks);
acks.forEach(this::doAfterCommit);
}
// callback to be used on forEach
private void doAfterCommit(Message ack, MessageReference ref) {
try {
route(server, ack);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
ref.getMessage().usageDown();
}
@Override
public void afterRollback(Transaction tx) {
acks.forEach(this::doAfterRollback);
}
// callback to be used on forEach
private void doAfterRollback(Message ack, MessageReference ref) {
OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class);
if (context != null) {
context.replicationDone();
}
}
}
@ -609,10 +603,17 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body, ackReason);
}
public static void route(ActiveMQServer server, Message message) throws Exception {
public static void routeMirrorCommand(ActiveMQServer server, Message message) throws Exception {
routeMirrorCommand(server, message, null);
}
public static void routeMirrorCommand(ActiveMQServer server, Message message, Transaction tx) throws Exception {
message.setMessageID(server.getStorageManager().generateID());
RoutingContext ctx = mirrorControlRouting.get();
ctx.clear().setMirrorOption(MirrorOption.disabled);
// it is important to use local only at the source to avoid having the message strictly load balancing
// to other nodes if the SNF queue has the same name as the one on this node.
ctx.clear().setMirrorOption(MirrorOption.disabled).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY).setTransaction(tx);
logger.debug("SetTX {}", tx);
server.getPostOffice().route(message, ctx, false);
}

View File

@ -448,7 +448,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
targetQueue.expire(reference, null, false);
break;
default:
targetQueue.acknowledge(null, reference, reason, null, false);
TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true);
targetQueue.acknowledge(transaction, reference, reason, null, false);
transaction.commit();
break;
}
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
@ -470,7 +472,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
String internalMirrorID = (String) deliveryAnnotations.getValue().get(BROKER_ID);
if (internalMirrorID == null) {
internalMirrorID = getRemoteMirrorId(); // not pasisng the ID means the data was generated on the remote broker
internalMirrorID = getRemoteMirrorId(); // not passing the ID means the data was generated on the remote broker
}
Long internalIDLong = (Long) deliveryAnnotations.getValue().get(INTERNAL_ID);
String internalAddress = (String) deliveryAnnotations.getValue().get(INTERNAL_DESTINATION);
@ -516,7 +518,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
message.setAddress(internalAddress);
}
final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager());
final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAsync(true);
transaction.addOperation(messageCompletionAck.tx);
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
@ -588,7 +590,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
if (reference == null) {
return false;
} else {
targetQueue.acknowledge(null, reference, AckReason.NORMAL, null, false);
TransactionImpl tx = new TransactionImpl(server.getStorageManager()).setAsync(true);
targetQueue.acknowledge(tx, reference, AckReason.NORMAL, null, false);
tx.commit();
OperationContextImpl.getContext().executeOnCompletion(operation);
return true;
}

View File

@ -1306,5 +1306,69 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
@Test
public void testSimpleReplicaTX() throws Exception {
String brokerConnectionName = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID();
server.setIdentity("targetServer");
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("server_2");
server_2.getConfiguration().setName("thisone");
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
replica.setName("theReplica");
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.getConfiguration().setName("server_2");
int NUMBER_OF_MESSAGES = 10;
server_2.start();
Wait.assertTrue(server_2::isStarted);
// We create the address to avoid auto delete on the queue
server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage(getText(true, i));
message.setIntProperty("i", i);
producer.send(message);
}
session.commit();
Queue queueOnServer1 = locateQueue(server, getQueueName());
Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF());
Assert.assertNotNull(snfreplica);
Wait.assertEquals(0, snfreplica::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount, 2000);
Queue queueOnServer2 = locateQueue(server_2, getQueueName());
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount);
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
connection.start();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message m = consumer.receive(1000);
Assert.assertNotNull(m);
}
session.commit();
Wait.assertEquals(0, snfreplica::getMessageCount);
Wait.assertEquals(0, queueOnServer1::getMessageCount);
Wait.assertEquals(0, queueOnServer2::getMessageCount);
}
}

View File

@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
@ -385,6 +386,7 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
}
private interface StorageCallback {
void storage(boolean isUpdate,
boolean isCommit,
long txID,
@ -424,7 +426,7 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
Object record,
boolean sync,
IOCompletion callback) throws Exception {
storageCallback.storage(false, false, -1, id, recordType, persister, record);
storageCallback.storage(false, false, -1, id, recordType, persister, record);
super.appendAddRecord(id, recordType, persister, record, sync, callback);
}
@ -451,7 +453,7 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
boolean sync,
IOCompletion callback,
boolean lineUpContext) throws Exception {
storageCallback.storage(false, true, txID, txID, (byte)0, null, null);
storageCallback.storage(false, true, txID, txID, (byte) 0, null, null);
super.appendCommitRecord(txID, sync, callback, lineUpContext);
}
@ -471,4 +473,130 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
}
};
}
@Test
public void testSimpleACK_TX_AMQP() throws Exception {
testSimpleAckSync("AMQP", true, false, 1024);
}
@Test
public void testSimpleACK_TX_CORE() throws Exception {
testSimpleAckSync("CORE", true, false, 1024);
}
@Test
public void testSimpleACK_NoTX_AMQP() throws Exception {
testSimpleAckSync("AMQP", false, false, 1024);
}
@Test
public void testSimpleACK_NoTX_CORE() throws Exception {
testSimpleAckSync("CORE", false, false, 1024);
}
@Test
public void testSimpleACK_NoTX_CORE_Large() throws Exception {
testSimpleAckSync("CORE", false, false, 255 * 1024);
}
@Test
public void testSimpleACK_TX_CORE_Large() throws Exception {
testSimpleAckSync("CORE", true, false, 255 * 1024);
}
@Test
public void testSimple_Core_Individual_Large() throws Exception {
testSimpleAckSync("CORE", false, true, 255 * 1024);
}
@Test
public void testSimple_Core_Individual() throws Exception {
testSimpleAckSync("CORE", false, true, 1024);
}
public void testSimpleAckSync(final String protocol, final boolean tx, final boolean individualAck, int messageSize) throws Exception {
AtomicInteger errors = new AtomicInteger(0);
final int NUMBER_OF_MESSAGES = 10;
slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
});
slowServer.setIdentity("slowServer");
server.setIdentity("server");
ExecutorService pool = Executors.newFixedThreadPool(5);
runAfter(pool::shutdown);
configureMirrorTowardsSlow(server);
slowServer.getConfiguration().setName("slow");
server.getConfiguration().setName("fast");
slowServer.start();
server.start();
waitForServerToStart(slowServer);
waitForServerToStart(server);
server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
Queue replicatedQueue = slowServer.locateQueue(getQueueName());
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + AMQP_PORT);
if (factory instanceof ActiveMQConnectionFactory) {
((ActiveMQConnectionFactory) factory).getServerLocator().setBlockOnAcknowledge(true);
}
Connection connection = factory.createConnection();
runAfter(connection::close);
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
connection.start();
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
final String bodyMessage;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < messageSize; i++) {
buffer.append("large Buffer...");
}
bodyMessage = buffer.toString();
}
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
int theI = i;
TextMessage message = session.createTextMessage(bodyMessage);
message.setStringProperty("strProperty", "" + theI);
producer.send(message);
Wait.assertEquals(i + 1, replicatedQueue::getMessageCount, 5000);
}
Wait.assertEquals(NUMBER_OF_MESSAGES, replicatedQueue::getMessageCount);
connection.start();
Session clientSession = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : (individualAck ? ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE));
MessageConsumer consumer = clientSession.createConsumer(clientSession.createQueue(getQueueName()));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
message.acknowledge();
if (tx) {
clientSession.commit();
}
Wait.assertEquals(NUMBER_OF_MESSAGES - i - 1, replicatedQueue::getMessageCount, 5000);
}
Assert.assertEquals(0, errors.get());
}
}

View File

@ -94,7 +94,7 @@ public class MirrorControllerBasicTest extends ActiveMQTestBase {
server.createQueue(new QueueConfiguration("test").setAddress("test").setRoutingType(RoutingType.ANYCAST));
Message message = AMQPMirrorMessageFactory.createMessage("test", SimpleString.toSimpleString("ad1"), SimpleString.toSimpleString("qu1"), "test", "someUID", "body-test", AckReason.KILLED);
AMQPMirrorControllerSource.route(server, message);
AMQPMirrorControllerSource.routeMirrorCommand(server, message);
AmqpClient client = new AmqpClient(new URI("tcp://localhost:61616"), null, null);
AmqpConnection connection = client.connect();

View File

@ -0,0 +1,313 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IdempotentACKTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static String largeBody;
static {
StringWriter writer = new StringWriter();
while (writer.getBuffer().length() < 1024 * 1024) {
writer.append("This is a large string ..... ");
}
largeBody = writer.toString();
}
private static final String QUEUE_NAME = "myQueue";
public static final String DC1_NODE_A = "idempotentMirror/DC1";
public static final String DC2_NODE_A = "idempotentMirror/DC2";
Process processDC1_node_A;
Process processDC2_node_A;
private static String DC1_NODEA_URI = "tcp://localhost:61616";
private static String DC2_NODEA_URI = "tcp://localhost:61618";
private static void createServer(String serverName,
String connectionName,
String mirrorURI,
int porOffset) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(false);
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
cliCreateServer.addArgs("--queues", QUEUE_NAME);
cliCreateServer.setPortOffset(porOffset);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
}
@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
}
private void startServers() throws Exception {
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
ServerUtil.waitForServerToStart(2, 10_000);
}
@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC2_NODE_A);
}
private void transactSend(Session session, MessageProducer producer, int initialCounter, int numberOfMessages, int largeMessageFactor) throws Throwable {
try {
for (int i = initialCounter; i < initialCounter + numberOfMessages; i++) {
TextMessage message;
String unique = "Unique " + i;
if (i % largeMessageFactor == 0) {
message = session.createTextMessage(largeBody);
message.setBooleanProperty("large", true);
} else {
message = session.createTextMessage("this is small");
message.setBooleanProperty("large", false);
}
message.setIntProperty("i", i);
message.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), unique);
producer.send(message);
}
session.commit();
} catch (JMSException e) {
if (e instanceof TransactionRolledBackException && e.getMessage().contains("Duplicate message detected")) {
logger.debug("OK Exception {}", e.getMessage(), e);
return; // ok
} else {
logger.warn("Not OK Exception {}", e.getMessage(), e);
throw e;
}
}
}
@Test
public void testAMQP() throws Exception {
testACKs("AMQP");
}
@Test
public void testCORE() throws Exception {
testACKs("CORE");
}
private void testACKs(final String protocol) throws Exception {
startServers();
final int consumers = 10;
final int numberOfMessages = 1000;
final int largeMessageFactor = 30;
final int messagesPerConsumer = 30;
// Just a reminder: if you change number on this test, this needs to be true:
Assert.assertEquals("Invalid test config", 0, numberOfMessages % consumers);
AtomicBoolean running = new AtomicBoolean(true);
runAfter(() -> running.set(false));
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
ExecutorService executor = Executors.newFixedThreadPool(consumers);
runAfter(executor::shutdownNow);
final ConnectionFactory connectionFactoryDC1A;
connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
CountDownLatch sendDone = new CountDownLatch(1);
CountDownLatch killSend = new CountDownLatch(1);
executor.execute(() -> {
int messagesSent = 0;
while (running.get() && messagesSent < numberOfMessages) {
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
if (messagesSent < 100) {
transactSend(session, producer, messagesSent, 1, 1);
messagesSent++;
logger.debug("Sent {}", messagesSent);
if (messagesSent == 100) {
logger.debug("Signal to kill");
killSend.countDown();
}
} else {
transactSend(session, producer, messagesSent, 100, largeMessageFactor);
messagesSent += 100;
logger.debug("Sent {}", messagesSent);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
try {
Thread.sleep(100);
} catch (Throwable ignored) {
}
}
}
sendDone.countDown();
});
Assert.assertTrue(killSend.await(50, TimeUnit.SECONDS));
restartDC1_ServerA();
Assert.assertTrue(sendDone.await(50, TimeUnit.SECONDS));
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(numberOfMessages, () -> getCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(numberOfMessages, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
CountDownLatch latchKill = new CountDownLatch(consumers);
CountDownLatch latchDone = new CountDownLatch(consumers);
Runnable runnableConsumer = () -> {
int messagesConsumed = 0;
while (running.get() && messagesConsumed < messagesPerConsumer) {
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
while (messagesConsumed < messagesPerConsumer) {
Message message = consumer.receive(100);
if (message instanceof TextMessage) {
logger.debug("message received={}", message);
session.commit();
messagesConsumed++;
logger.debug("Received {}", messagesConsumed);
if (messagesConsumed == 10) {
latchKill.countDown();
}
} else {
logger.info("no messages...");
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
try {
Thread.sleep(100);
} catch (Throwable ignored) {
}
}
}
latchDone.countDown();
};
for (int i = 0; i < consumers; i++) {
executor.execute(runnableConsumer);
}
Assert.assertTrue(latchKill.await(10, TimeUnit.SECONDS));
restartDC1_ServerA();
Assert.assertTrue(latchDone.await(4, TimeUnit.MINUTES));
long flushedMessages = 0;
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
while (consumer.receive(500) != null) {
flushedMessages++;
}
session.commit();
}
logger.debug("Flushed {}", flushedMessages);
// after all flushed messages, we should have 0 messages on both nodes
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
}
private void restartDC1_ServerA() throws Exception {
processDC1_node_A.destroyForcibly();
Assert.assertTrue(processDC1_node_A.waitFor(10, TimeUnit.SECONDS));
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
}
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value;
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.sender;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SenderSoakTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static String largeBody;
private static String smallBody = "This is a small body";
{
StringWriter writer = new StringWriter();
while (writer.getBuffer().length() < 1024 * 1024) {
writer.append("This is a large string ..... ");
}
largeBody = writer.toString();
}
public static final String DC1_NODE_A = "sender/DC1/A";
public static final String DC2_NODE_A = "sender/DC2/A";
Process processDC1_node_A;
Process processDC2_node_A;
private static String DC1_NODEA_URI = "tcp://localhost:61616";
private static String DC2_NODEA_URI = "tcp://localhost:61618";
private static void createServer(String serverName, int porOffset) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("STRICT");
cliCreateServer.setClustered(false);
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
cliCreateServer.addArgs("--addresses", "order");
cliCreateServer.addArgs("--queues", "myQueue");
cliCreateServer.setPortOffset(porOffset);
cliCreateServer.createServer();
}
public static void createServers(boolean useMirror) throws Exception {
createServer(DC1_NODE_A, 0);
if (useMirror) {
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections.sender.uri", "tcp://localhost:61618");
brokerProperties.put("AMQPConnections.sender.retryInterval", "100");
brokerProperties.put("AMQPConnections.sender.connectionElements.sender.type", "MIRROR");
brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(getServerLocation(DC1_NODE_A), "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
} else {
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections.sender.uri", "tcp://localhost:61618");
brokerProperties.put("AMQPConnections.sender.retryInterval", "100");
brokerProperties.put("AMQPConnections.sender.connectionElements.sender.type", "SENDER");
brokerProperties.put("AMQPConnections.sender.connectionElements.sender.queueName", "myQueue");
brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(getServerLocation(DC1_NODE_A), "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
}
createServer(DC2_NODE_A, 2);
}
private void startServers() throws Exception {
processDC2_node_A = startServer(DC2_NODE_A, -1, -1);
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
ServerUtil.waitForServerToStart(2, 10_000);
}
@Test
public void testMirror() throws Exception {
testSender(true);
}
@Test
public void testSender() throws Exception {
testSender(false);
}
public void testSender(boolean mirror) throws Exception {
createServers(mirror);
startServers();
final int numberOfMessages = 1000;
Assert.assertTrue("numberOfMessages must be even", numberOfMessages % 2 == 0);
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61618");
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message;
boolean large;
if (i % 1 == 10) {
message = session.createTextMessage(largeBody);
large = true;
} else {
message = session.createTextMessage(smallBody);
large = false;
}
message.setIntProperty("i", i);
message.setBooleanProperty("large", large);
producer.send(message);
if (i % 100 == 0) {
logger.debug("commit {}", i);
session.commit();
}
}
session.commit();
}
logger.debug("All messages were sent");
try (Connection connection = connectionFactoryDC2A.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large"));
}
session.commit();
}
}
}