ARTEMIS-4309 Read all bytes of compressed objmsg

Continually read from the compressed byte[] into
the decompressed object

Add test to validate large (>1024 bytes) compressed data can be
deserialized properly
This commit is contained in:
Peter Brady 2023-05-28 09:39:06 -04:00 committed by clebertsuconic
parent c96a074b53
commit 2f5463c960
2 changed files with 24 additions and 1 deletions

View File

@ -267,7 +267,7 @@ public final class OpenWireMessageConverter {
int n = ois.read(buf);
while (n != -1) {
decompressed.write(buf, 0, n);
n = ois.read();
n = ois.read(buf);
}
//read done
return decompressed.toByteSequence();

View File

@ -38,6 +38,7 @@ import org.junit.Test;
public class CompressedInteropTest extends BasicOpenWireTest {
private static final String TEXT;
private static final String LARGE_TEXT;
static {
StringBuilder builder = new StringBuilder();
@ -46,6 +47,7 @@ public class CompressedInteropTest extends BasicOpenWireTest {
builder.append("The quick red fox jumped over the lazy brown dog. ");
}
TEXT = builder.toString();
LARGE_TEXT = TEXT + TEXT + TEXT + TEXT + TEXT;
}
@Before
@ -90,6 +92,9 @@ public class CompressedInteropTest extends BasicOpenWireTest {
//ObjectMessage
sendCompressedObjectMessageUsingOpenWire();
receiveObjectMessage(useCore);
//Large ObjectMessage
sendCompressedLargeObjectMessageUsingOpenWire();
receiveLargeObjectMessage(useCore);
}
private void sendCompressedStreamMessageUsingOpenWire() throws Exception {
@ -164,6 +169,24 @@ public class CompressedInteropTest extends BasicOpenWireTest {
assertEquals(TEXT, objectVal);
}
private void sendCompressedLargeObjectMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(LARGE_TEXT);
producer.send(objectMessage);
}
private void receiveLargeObjectMessage(boolean useCore) throws Exception {
ObjectMessage objectMessage = (ObjectMessage) receiveMessage(useCore);
Object objectVal = objectMessage.getObject();
assertEquals(LARGE_TEXT, objectVal);
}
private void sendCompressedMapMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);