ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages

This commit is contained in:
Clebert Suconic 2022-11-17 15:34:44 -05:00 committed by clebertsuconic
parent 3a13a7850c
commit 0866a2eb88
3 changed files with 87 additions and 3 deletions

View File

@ -112,7 +112,15 @@ public class EmbedMessageUtil {
private static Message extractLargeMessage(ICoreMessage message, StorageManager storageManager) {
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(message.getBytesProperty(AMQP_ENCODE_PROPERTY));
return readEncoded(message, storageManager, buffer);
Message largeMessageReturn = readEncoded(message, storageManager, buffer);
if (message instanceof LargeServerMessage && largeMessageReturn instanceof LargeServerMessage) {
LargeServerMessage returnMessage = (LargeServerMessage) largeMessageReturn;
LargeServerMessage sourceMessage = (LargeServerMessage) message;
returnMessage.setPendingRecordID(sourceMessage.getPendingRecordID());
}
return largeMessageReturn;
}
private static boolean checkSignature(final ActiveMQBuffer buffer) {

View File

@ -123,6 +123,8 @@ public class AMQPLargeMessageOverCoreBridgeTest extends AmqpClientTestSupport {
}
sendTextMessages(AMQP_PORT + 1, getQueueName(useDivert ? 0 : 1), largeText.toString(), 10);
server.stop();
server.start();
receiveTextMessages(AMQP_PORT, getQueueName(2), largeText.toString(), 10);
if (useDivert) {
// We diverted, so messages were copied, we need to make sure we consume from the original queue

View File

@ -36,14 +36,15 @@ public class ClusteredLargeMessageTest extends SmokeTestBase {
public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
Process server0Process;
Process server1Process;
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
cleanupData(SERVER_NAME_1);
server1Process = startServer(SERVER_NAME_0, 0, 30000);
startServer(SERVER_NAME_1, 100, 30000);
server0Process = startServer(SERVER_NAME_0, 0, 30000);
server1Process = startServer(SERVER_NAME_1, 100, 30000);
}
@Test
@ -96,5 +97,78 @@ public class ClusteredLargeMessageTest extends SmokeTestBase {
connection1.close();
connection2.close();
}
@Test
public void testKillWhileSendingLargeCORE() throws Exception {
testKillWhileSendingLarge("CORE");
}
@Test
public void testKillWhileSendingLargeAMQP() throws Exception {
testKillWhileSendingLarge("AMQP");
}
public void testKillWhileSendingLarge(String protocol) throws Exception {
ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61716");
Connection keepConsumerConnection = server2CF.createConnection();
Session keepConsumerSession = keepConsumerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// a consumer that we should keep to induce message redistribution
MessageConsumer keepConsumer = keepConsumerSession.createConsumer(keepConsumerSession.createQueue("testQueue"));
String largeBody;
{
StringBuffer largeBodyBuffer = new StringBuffer();
while (largeBodyBuffer.length() < 1024 * 1024) {
largeBodyBuffer.append("This is large ");
}
largeBody = largeBodyBuffer.toString();
}
int NMESSAGES = 10;
ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection1 = server1CF.createConnection()) {
Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
Queue queue1 = session1.createQueue("testQueue");
MessageProducer producer1 = session1.createProducer(queue1);
for (int i = 0; i < NMESSAGES; i++) {
TextMessage message = session1.createTextMessage(largeBody);
message.setStringProperty("i", Integer.toString(i));
producer1.send(message);
if (i == 5) {
session1.commit();
}
}
session1.commit();
}
keepConsumerConnection.close();
server1Process.destroyForcibly();
server1Process = startServer(SERVER_NAME_1, 100, 0);
for (int i = 0; i < 100; i++) {
// retrying the connection until the server is up
try (Connection willbegone = server2CF.createConnection()) {
break;
} catch (Exception ignored) {
Thread.sleep(100);
}
}
try (Connection connection2 = server2CF.createConnection()) {
Session session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue2 = session2.createQueue("testQueue");
MessageConsumer consumer2 = session2.createConsumer(queue2);
connection2.start();
for (int i = 0; i < NMESSAGES; i++) {
TextMessage message = (TextMessage) consumer2.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(largeBody, message.getText());
}
}
}
}