diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index 190456dca0..bd5ab073d5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -896,6 +896,88 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { } + @Test + public void testWithTXLargeMessage() throws Exception { + testWithTX(true); + } + + @Test + public void testWithTX() throws Exception { + testWithTX(false); + } + + + private void testWithTX(boolean largeMessage) throws Exception { + + server.setIdentity("server_1"); + server.start(); + + ActiveMQServer server_3 = createServer(AMQP_PORT_3, false); + server_3.setIdentity("server_3"); + server_3.start(); + Wait.assertTrue(server_3::isStarted); + + ConnectionFactory factory_3 = CFUtil.createConnectionFactory("amqp", "tcp://localhost:" + AMQP_PORT_3); + factory_3.createConnection().close(); + + server_2 = createServer(AMQP_PORT_2, false); + + String brokerConnectionOne = "brokerConnection1:" + UUIDGenerator.getInstance().generateStringUUID(); + String brokerConnectionTwo = "brokerConnection2:" + UUIDGenerator.getInstance().generateStringUUID(); + + AMQPBrokerConnectConfiguration amqpConnection1 = new AMQPBrokerConnectConfiguration(brokerConnectionOne, "tcp://localhost:" + AMQP_PORT); + AMQPMirrorBrokerConnectionElement replica1 = new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR); + amqpConnection1.addElement(replica1); + server_2.getConfiguration().addAMQPConnection(amqpConnection1); + + AMQPBrokerConnectConfiguration amqpConnection3 = new AMQPBrokerConnectConfiguration(brokerConnectionTwo, "tcp://localhost:" + AMQP_PORT_3); + AMQPMirrorBrokerConnectionElement replica2 = new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR); + amqpConnection3.addElement(replica2); + server_2.getConfiguration().addAMQPConnection(amqpConnection3); + + int NUMBER_OF_MESSAGES = 5; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2); + Connection connection = factory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + Queue queue_server_2 = locateQueue(server_2, getQueueName()); + Queue queue_server_1 = locateQueue(server, getQueueName()); + Queue queue_server_3 = locateQueue(server_3, getQueueName()); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + Message message = session.createTextMessage(getText(largeMessage, i)); + message.setIntProperty("i", i); + producer.send(message); + } + session.rollback(); + + // Allowing a window in which message could be sent on the replica + Thread.sleep(100); + + Wait.assertEquals(0, queue_server_2::getMessageCount); + Wait.assertEquals(0, queue_server_3::getMessageCount); + Wait.assertEquals(0, queue_server_1::getMessageCount); + + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + Message message = session.createTextMessage(getText(largeMessage, i)); + message.setIntProperty("i", i); + producer.send(message); + } + session.commit(); + + + Wait.assertEquals(NUMBER_OF_MESSAGES, queue_server_2::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queue_server_3::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queue_server_1::getMessageCount); + } + /** * this might be helpful for debugging */