From 69e21a0eb7f7d1a73fc1809c13b683a381841e4c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 9 Feb 2023 05:57:50 -0500 Subject: [PATCH] ARTEMIS-4163 Fixing openwire race while chunkSend is happening --- .../protocol/openwire/OpenWireConnection.java | 16 +++++++++------ .../openwire/OpenWireLargeMessageTest.java | 20 +++++++++++++++---- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index bb22cc48d0..8a710c2aa0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -548,12 +548,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se final int bufferSize = bytes.length; final int maxChunkSize = protocolManager.getOpenwireMaxPacketChunkSize(); - if (maxChunkSize > 0 && bufferSize > maxChunkSize) { - chunkSend(bytes, bufferSize, maxChunkSize); - } else { - final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize); - buffer.writeBytes(bytes.data, bytes.offset, bufferSize); - transportConnection.write(buffer, false, false); + // We can't let any other packet to sneak in while chunkSend is happening. + // otherwise we may get wrong packts delivered + synchronized (transportConnection) { + if (maxChunkSize > 0 && bufferSize > maxChunkSize) { + chunkSend(bytes, bufferSize, maxChunkSize); + } else { + final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize); + buffer.writeBytes(bytes.data, bytes.offset, bufferSize); + transportConnection.write(buffer, false, false); + } } bufferSent(); } catch (IOException e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java index 02e346ad13..e862703544 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java @@ -20,12 +20,14 @@ import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -190,7 +192,7 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { @Test public void testSendReceiveLargeMessageTX() throws Exception { - int NUMBER_OF_MESSAGES = 400; + int NUMBER_OF_MESSAGES = 1000; int TX_SIZE = 100; ExecutorService executorService = Executors.newFixedThreadPool(1); @@ -220,8 +222,11 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { Queue queue = session.createQueue(lmAddress.toString()); MessageConsumer consumer = session.createConsumer(queue); for (int received = 0; received < NUMBER_OF_MESSAGES; received++) { - TextMessage m = (TextMessage) consumer.receive(5000); - assertEquals(largeString, m.getText()); + Message m = consumer.receive(5000); + Assert.assertNotNull(m); + if (m instanceof TextMessage) { + assertEquals(largeString, ((TextMessage) m).getText()); + } if (received > 0 && received % TX_SIZE == 0) { logger.info("Received {} messages", received); session.commit(); @@ -247,7 +252,14 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int sent = 0; sent < NUMBER_OF_MESSAGES; sent++) { - TextMessage message = session.createTextMessage(largeString); + Message message; + if (sent % 2 == 0) { + message = session.createTextMessage(largeString); + } else { + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(largeString.getBytes(StandardCharsets.UTF_8)); + message = bytesMessage; + } producer.send(message); if (sent > 0 && sent % TX_SIZE == 0) { logger.info("Sent {} messages", sent);