NO-JIRA Adding a non failing test on TX send and mirror

This commit is contained in:
Clebert Suconic 2022-07-13 12:51:48 -04:00
parent ee877e83a6
commit 8a6ee31055
1 changed files with 82 additions and 0 deletions

View File

@ -896,6 +896,88 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
@Test
public void testWithTXLargeMessage() throws Exception {
testWithTX(true);
}
@Test
public void testWithTX() throws Exception {
testWithTX(false);
}
private void testWithTX(boolean largeMessage) throws Exception {
server.setIdentity("server_1");
server.start();
ActiveMQServer server_3 = createServer(AMQP_PORT_3, false);
server_3.setIdentity("server_3");
server_3.start();
Wait.assertTrue(server_3::isStarted);
ConnectionFactory factory_3 = CFUtil.createConnectionFactory("amqp", "tcp://localhost:" + AMQP_PORT_3);
factory_3.createConnection().close();
server_2 = createServer(AMQP_PORT_2, false);
String brokerConnectionOne = "brokerConnection1:" + UUIDGenerator.getInstance().generateStringUUID();
String brokerConnectionTwo = "brokerConnection2:" + UUIDGenerator.getInstance().generateStringUUID();
AMQPBrokerConnectConfiguration amqpConnection1 = new AMQPBrokerConnectConfiguration(brokerConnectionOne, "tcp://localhost:" + AMQP_PORT);
AMQPMirrorBrokerConnectionElement replica1 = new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR);
amqpConnection1.addElement(replica1);
server_2.getConfiguration().addAMQPConnection(amqpConnection1);
AMQPBrokerConnectConfiguration amqpConnection3 = new AMQPBrokerConnectConfiguration(brokerConnectionTwo, "tcp://localhost:" + AMQP_PORT_3);
AMQPMirrorBrokerConnectionElement replica2 = new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR);
amqpConnection3.addElement(replica2);
server_2.getConfiguration().addAMQPConnection(amqpConnection3);
int NUMBER_OF_MESSAGES = 5;
server_2.start();
Wait.assertTrue(server_2::isStarted);
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Queue queue_server_2 = locateQueue(server_2, getQueueName());
Queue queue_server_1 = locateQueue(server, getQueueName());
Queue queue_server_3 = locateQueue(server_3, getQueueName());
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage(getText(largeMessage, i));
message.setIntProperty("i", i);
producer.send(message);
}
session.rollback();
// Allowing a window in which message could be sent on the replica
Thread.sleep(100);
Wait.assertEquals(0, queue_server_2::getMessageCount);
Wait.assertEquals(0, queue_server_3::getMessageCount);
Wait.assertEquals(0, queue_server_1::getMessageCount);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage(getText(largeMessage, i));
message.setIntProperty("i", i);
producer.send(message);
}
session.commit();
Wait.assertEquals(NUMBER_OF_MESSAGES, queue_server_2::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queue_server_3::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queue_server_1::getMessageCount);
}
/**
* this might be helpful for debugging
*/