ARTEMIS-4588 30 second pause for large msgs + federation

Large message support was added to
o.a.a.a.c.s.f.FederatedQueueConsumerImpl#onMessage via cf85d35 for
ARTEMIS-3308. The problem with that change is that when onMessage
returns o.a.a.a.c.c.i.ClientConsumerImpl#callOnMessage will eventually
call o.a.a.a.c.c.i.ClientLargeMessageImpl#discardBody which eventually
ends up in o.a.a.a.c.c.i.LargeMessageControllerImpl#popPacket waiting 30
seconds (i.e. the default readTimeout) for more packets to arrive (which
never do). This happens because the FederatedQueueConsumer short-cuts
the "normal" process by using LargeMessageControllerImpl#take.

This commit fixes that by tracking the number of bytes "taken" and then
looking at that value later when discarding the body effectively
skipping the 30 second wait.
This commit is contained in:
Justin Bertram 2024-02-07 14:52:03 -06:00
parent 9cd598ebf4
commit 77d9f10a3d
2 changed files with 24 additions and 4 deletions

View File

@ -83,6 +83,8 @@ public class LargeMessageControllerImpl implements LargeMessageController {
private long packetLastPosition = -1;
private long bytesTaken = 0;
private OutputStream outStream;
// There's no need to wait a synchronization
@ -315,7 +317,9 @@ public class LargeMessageControllerImpl implements LargeMessageController {
@Override
public LargeData take() throws InterruptedException {
return largeMessageData.take();
LargeData largeData = largeMessageData.take();
bytesTaken += largeData.getChunk().length;
return largeData;
}
/**
@ -1146,6 +1150,10 @@ public class LargeMessageControllerImpl implements LargeMessageController {
}
private void checkForPacket(final long index) {
if (totalSize == bytesTaken) {
return;
}
if (outStream != null) {
throw new IllegalAccessError("Can't read the messageBody after setting outputStream");
}

View File

@ -304,6 +304,15 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testWithLargeMessage() throws Exception {
internalTestWithLargeMessages(1);
}
@Test
public void testWithMultipleLargeMessages() throws Exception {
internalTestWithLargeMessages(5);
}
private void internalTestWithLargeMessages(int messageNumber) throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
@ -319,14 +328,18 @@ public class FederatedQueueTest extends FederatedTestBase {
Session session1 = connection1.createSession();
Queue queue1 = session1.createQueue(queueName);
MessageProducer producer = session1.createProducer(queue1);
producer.send(session1.createTextMessage(payload));
for (int i = 0; i < messageNumber; i++) {
producer.send(session1.createTextMessage(payload));
}
connection0.start();
Session session0 = connection0.createSession();
Queue queue0 = session0.createQueue(queueName);
MessageConsumer consumer0 = session0.createConsumer(queue0);
assertNotNull(consumer0.receive(60000));
for (int i = 0; i < messageNumber; i++) {
assertNotNull(consumer0.receive(1000));
}
}
}
@ -704,5 +717,4 @@ public class FederatedQueueTest extends FederatedTestBase {
return message;
}
}
}