mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-12 21:15:27 +00:00
ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages
(cherry picked from commit 0866a2eb8846284d2b865ef876b146cd80270f59)
This commit is contained in:
parent
b471392872
commit
7e00a701fe
@ -112,7 +112,15 @@ public class EmbedMessageUtil {
|
|||||||
private static Message extractLargeMessage(ICoreMessage message, StorageManager storageManager) {
|
private static Message extractLargeMessage(ICoreMessage message, StorageManager storageManager) {
|
||||||
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(message.getBytesProperty(AMQP_ENCODE_PROPERTY));
|
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(message.getBytesProperty(AMQP_ENCODE_PROPERTY));
|
||||||
|
|
||||||
return readEncoded(message, storageManager, buffer);
|
Message largeMessageReturn = readEncoded(message, storageManager, buffer);
|
||||||
|
|
||||||
|
if (message instanceof LargeServerMessage && largeMessageReturn instanceof LargeServerMessage) {
|
||||||
|
LargeServerMessage returnMessage = (LargeServerMessage) largeMessageReturn;
|
||||||
|
LargeServerMessage sourceMessage = (LargeServerMessage) message;
|
||||||
|
returnMessage.setPendingRecordID(sourceMessage.getPendingRecordID());
|
||||||
|
}
|
||||||
|
|
||||||
|
return largeMessageReturn;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean checkSignature(final ActiveMQBuffer buffer) {
|
private static boolean checkSignature(final ActiveMQBuffer buffer) {
|
||||||
|
@ -123,6 +123,8 @@ public class AMQPLargeMessageOverCoreBridgeTest extends AmqpClientTestSupport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
sendTextMessages(AMQP_PORT + 1, getQueueName(useDivert ? 0 : 1), largeText.toString(), 10);
|
sendTextMessages(AMQP_PORT + 1, getQueueName(useDivert ? 0 : 1), largeText.toString(), 10);
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
receiveTextMessages(AMQP_PORT, getQueueName(2), largeText.toString(), 10);
|
receiveTextMessages(AMQP_PORT, getQueueName(2), largeText.toString(), 10);
|
||||||
if (useDivert) {
|
if (useDivert) {
|
||||||
// We diverted, so messages were copied, we need to make sure we consume from the original queue
|
// We diverted, so messages were copied, we need to make sure we consume from the original queue
|
||||||
|
@ -36,14 +36,15 @@ public class ClusteredLargeMessageTest extends SmokeTestBase {
|
|||||||
public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
|
public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
|
||||||
public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
|
public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
|
||||||
|
|
||||||
|
Process server0Process;
|
||||||
Process server1Process;
|
Process server1Process;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void before() throws Exception {
|
public void before() throws Exception {
|
||||||
cleanupData(SERVER_NAME_0);
|
cleanupData(SERVER_NAME_0);
|
||||||
cleanupData(SERVER_NAME_1);
|
cleanupData(SERVER_NAME_1);
|
||||||
server1Process = startServer(SERVER_NAME_0, 0, 30000);
|
server0Process = startServer(SERVER_NAME_0, 0, 30000);
|
||||||
startServer(SERVER_NAME_1, 100, 30000);
|
server1Process = startServer(SERVER_NAME_1, 100, 30000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -96,5 +97,78 @@ public class ClusteredLargeMessageTest extends SmokeTestBase {
|
|||||||
connection1.close();
|
connection1.close();
|
||||||
connection2.close();
|
connection2.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKillWhileSendingLargeCORE() throws Exception {
|
||||||
|
testKillWhileSendingLarge("CORE");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKillWhileSendingLargeAMQP() throws Exception {
|
||||||
|
testKillWhileSendingLarge("AMQP");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testKillWhileSendingLarge(String protocol) throws Exception {
|
||||||
|
|
||||||
|
ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61716");
|
||||||
|
Connection keepConsumerConnection = server2CF.createConnection();
|
||||||
|
Session keepConsumerSession = keepConsumerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
// a consumer that we should keep to induce message redistribution
|
||||||
|
MessageConsumer keepConsumer = keepConsumerSession.createConsumer(keepConsumerSession.createQueue("testQueue"));
|
||||||
|
|
||||||
|
String largeBody;
|
||||||
|
{
|
||||||
|
StringBuffer largeBodyBuffer = new StringBuffer();
|
||||||
|
while (largeBodyBuffer.length() < 1024 * 1024) {
|
||||||
|
largeBodyBuffer.append("This is large ");
|
||||||
|
}
|
||||||
|
largeBody = largeBodyBuffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
int NMESSAGES = 10;
|
||||||
|
|
||||||
|
ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
|
||||||
|
try (Connection connection1 = server1CF.createConnection()) {
|
||||||
|
Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue queue1 = session1.createQueue("testQueue");
|
||||||
|
MessageProducer producer1 = session1.createProducer(queue1);
|
||||||
|
for (int i = 0; i < NMESSAGES; i++) {
|
||||||
|
TextMessage message = session1.createTextMessage(largeBody);
|
||||||
|
message.setStringProperty("i", Integer.toString(i));
|
||||||
|
producer1.send(message);
|
||||||
|
|
||||||
|
if (i == 5) {
|
||||||
|
session1.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
session1.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
keepConsumerConnection.close();
|
||||||
|
server1Process.destroyForcibly();
|
||||||
|
server1Process = startServer(SERVER_NAME_1, 100, 0);
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
// retrying the connection until the server is up
|
||||||
|
try (Connection willbegone = server2CF.createConnection()) {
|
||||||
|
break;
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (Connection connection2 = server2CF.createConnection()) {
|
||||||
|
Session session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue2 = session2.createQueue("testQueue");
|
||||||
|
MessageConsumer consumer2 = session2.createConsumer(queue2);
|
||||||
|
connection2.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < NMESSAGES; i++) {
|
||||||
|
TextMessage message = (TextMessage) consumer2.receive(5000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
Assert.assertEquals(largeBody, message.getText());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user