ARTEMIS-4163 Fixing openwire race while chunkSend is happening

This commit is contained in:
Clebert Suconic 2023-02-09 05:57:50 -05:00 committed by clebertsuconic
parent 446df6d825
commit 69e21a0eb7
2 changed files with 26 additions and 10 deletions

View File

@ -548,12 +548,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
final int bufferSize = bytes.length;
final int maxChunkSize = protocolManager.getOpenwireMaxPacketChunkSize();
if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
chunkSend(bytes, bufferSize, maxChunkSize);
} else {
final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize);
buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
transportConnection.write(buffer, false, false);
// We can't let any other packet to sneak in while chunkSend is happening.
// otherwise we may get wrong packts delivered
synchronized (transportConnection) {
if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
chunkSend(bytes, bufferSize, maxChunkSize);
} else {
final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize);
buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
transportConnection.write(buffer, false, false);
}
}
bufferSent();
} catch (IOException e) {

View File

@ -20,12 +20,14 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -190,7 +192,7 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
@Test
public void testSendReceiveLargeMessageTX() throws Exception {
int NUMBER_OF_MESSAGES = 400;
int NUMBER_OF_MESSAGES = 1000;
int TX_SIZE = 100;
ExecutorService executorService = Executors.newFixedThreadPool(1);
@ -220,8 +222,11 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
Queue queue = session.createQueue(lmAddress.toString());
MessageConsumer consumer = session.createConsumer(queue);
for (int received = 0; received < NUMBER_OF_MESSAGES; received++) {
TextMessage m = (TextMessage) consumer.receive(5000);
assertEquals(largeString, m.getText());
Message m = consumer.receive(5000);
Assert.assertNotNull(m);
if (m instanceof TextMessage) {
assertEquals(largeString, ((TextMessage) m).getText());
}
if (received > 0 && received % TX_SIZE == 0) {
logger.info("Received {} messages", received);
session.commit();
@ -247,7 +252,14 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int sent = 0; sent < NUMBER_OF_MESSAGES; sent++) {
TextMessage message = session.createTextMessage(largeString);
Message message;
if (sent % 2 == 0) {
message = session.createTextMessage(largeString);
} else {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(largeString.getBytes(StandardCharsets.UTF_8));
message = bytesMessage;
}
producer.send(message);
if (sent > 0 && sent % TX_SIZE == 0) {
logger.info("Sent {} messages", sent);