ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed

This commit is contained in:
Clebert Suconic 2022-11-10 07:51:30 -05:00 committed by clebertsuconic
parent 9528e45869
commit f2e0f8713f
2 changed files with 37 additions and 14 deletions

View File

@ -459,7 +459,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
}
pos += numberOfBytesRead;
} while (pos < minLargeMessageSize);
}
while (pos < minLargeMessageSize);
totalSize += pos;

View File

@ -2795,27 +2795,36 @@ public class LargeMessageTest extends LargeMessageTestBase {
}
@Test
public void testStream() throws Exception {
public void testStreamedMessage() throws Exception {
testStream(false);
}
@Test
public void testStreamedMessageCompressed() throws Exception {
testStream(true);
}
private void testStream(boolean compressed) throws Exception {
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
locator.setCompressLargeMessage(true);
locator.setCompressLargeMessage(compressed);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(false, false);
final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
session.createQueue(new QueueConfiguration(ADDRESS));
server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session.createProducer(MY_QUEUE);
ClientProducer producer = session.createProducer(ADDRESS);
AtomicBoolean closed = new AtomicBoolean(false);
final int BYTES = 1_000;
InputStream inputStream = new InputStream() {
int bytes = 10_000;
int bytes = BYTES;
@Override
public int read() throws IOException {
if (bytes-- > 0) {
@ -2830,12 +2839,10 @@ public class LargeMessageTest extends LargeMessageTestBase {
closed.set(true);
}
@Override
public int available () throws IOException {
public int available() {
return bytes;
}
};
ClientMessage message = session.createMessage(true);
@ -2844,11 +2851,26 @@ public class LargeMessageTest extends LargeMessageTestBase {
Wait.assertTrue(closed::get);
session.commit();
session.start();
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientMessage receivedMessage = consumer.receive(5000);
Assert.assertNotNull(receivedMessage);
ActiveMQBuffer buffer = receivedMessage.getBodyBuffer();
Assert.assertEquals(BYTES, buffer.readableBytes());
for (int i = 0; i < BYTES; i++) {
Assert.assertEquals((byte)10, buffer.readByte());
}
Assert.assertEquals(0, buffer.readableBytes());
session.close();
}
}