ARTEMIS-1975 Fixing encodesize cached on AMQP Large Message

Encoding ahead is broken with AMQP Large Message
This is still part of ARTEMIS-1975
This commit is contained in:
Clebert Suconic 2020-03-24 15:47:23 -04:00
parent 265c8f054d
commit 31c945f8b0
2 changed files with 19 additions and 6 deletions

View File

@ -75,17 +75,25 @@ public class JournalAddRecordTX extends JournalInternalRecord {
buffer.writeLong(id);
buffer.writeInt(persister.getEncodeSize(record));
int persisterEncodeSize = persister.getEncodeSize(record);
int encodeSize = getInternalEncodeSize(persisterEncodeSize);
buffer.writeInt(persisterEncodeSize);
buffer.writeByte(recordType);
persister.encode(buffer, record);
buffer.writeInt(getEncodeSize());
// AMQP Persister may save encoding between getEcodeSize and .encode(). After encode it may release the encoding
buffer.writeInt(encodeSize);
}
@Override
public int getEncodeSize() {
return JournalImpl.SIZE_ADD_RECORD_TX + persister.getEncodeSize(record) + 1;
return getInternalEncodeSize(persister.getEncodeSize(record));
}
private int getInternalEncodeSize(int persisterEncodeSize) {
return JournalImpl.SIZE_ADD_RECORD_TX + persisterEncodeSize + 1;
}
}

View File

@ -181,7 +181,7 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
@Test
public void testSendWithPropertiesNonPersistent() throws Exception {
testSendWithPropertiesAndFilter(true, false);
testSendWithPropertiesAndFilter(true, true);
}
@ -204,10 +204,16 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
assertEquals(0, queueView.getMessageCount());
session.begin();
int oddID = 0;
for (int m = 0; m < 10; m++) {
AmqpMessage message = new AmqpMessage();
message.setDurable(persistent);
message.setApplicationProperty("odd", (m % 2 == 0));
boolean odd = (m % 2 == 0);
message.setApplicationProperty("odd", odd);
if (odd) {
message.setApplicationProperty("oddID", oddID++);
}
byte[] bytes = new byte[size];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) 'z';
@ -286,7 +292,6 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
jmsTest(persistent, tx);
}
@Test
public void testJMSNonPersistentTX() throws Exception {