ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed

(cherry picked from commit f2e0f8713f)
This commit is contained in:
Clebert Suconic 2022-11-10 07:51:30 -05:00
parent 58f201bbe3
commit 60c5b9871e
2 changed files with 37 additions and 14 deletions

View File

@ -459,7 +459,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
} }
pos += numberOfBytesRead; pos += numberOfBytesRead;
} while (pos < minLargeMessageSize); }
while (pos < minLargeMessageSize);
totalSize += pos; totalSize += pos;

View File

@ -2795,27 +2795,36 @@ public class LargeMessageTest extends LargeMessageTestBase {
} }
@Test @Test
public void testStream() throws Exception { public void testStreamedMessage() throws Exception {
testStream(false);
}
@Test
public void testStreamedMessageCompressed() throws Exception {
testStream(true);
}
private void testStream(boolean compressed) throws Exception {
ActiveMQServer server = createServer(true, isNetty(), storeType); ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start(); server.start();
locator.setCompressLargeMessage(true); locator.setCompressLargeMessage(compressed);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(false, false); ClientSession session = sf.createSession(false, false);
final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE"); session.createQueue(new QueueConfiguration(ADDRESS));
server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST)); ClientProducer producer = session.createProducer(ADDRESS);
ClientProducer producer = session.createProducer(MY_QUEUE);
AtomicBoolean closed = new AtomicBoolean(false); AtomicBoolean closed = new AtomicBoolean(false);
final int BYTES = 1_000;
InputStream inputStream = new InputStream() { InputStream inputStream = new InputStream() {
int bytes = 10_000; int bytes = BYTES;
@Override @Override
public int read() throws IOException { public int read() throws IOException {
if (bytes-- > 0) { if (bytes-- > 0) {
@ -2830,12 +2839,10 @@ public class LargeMessageTest extends LargeMessageTestBase {
closed.set(true); closed.set(true);
} }
@Override @Override
public int available () throws IOException { public int available() {
return bytes; return bytes;
} }
}; };
ClientMessage message = session.createMessage(true); ClientMessage message = session.createMessage(true);
@ -2844,11 +2851,26 @@ public class LargeMessageTest extends LargeMessageTestBase {
Wait.assertTrue(closed::get); Wait.assertTrue(closed::get);
session.commit();
session.start();
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientMessage receivedMessage = consumer.receive(5000);
Assert.assertNotNull(receivedMessage);
ActiveMQBuffer buffer = receivedMessage.getBodyBuffer();
Assert.assertEquals(BYTES, buffer.readableBytes());
for (int i = 0; i < BYTES; i++) {
Assert.assertEquals((byte)10, buffer.readByte());
}
Assert.assertEquals(0, buffer.readableBytes());
session.close(); session.close();
} }
} }