ARTEMIS-2766 Fixing AMQP Large Messages after Parsing change

This commit is contained in:
Clebert Suconic 2020-05-15 17:57:00 -04:00
parent fd2b869a1a
commit 83c7942a78
3 changed files with 8 additions and 7 deletions

View File

@ -1020,8 +1020,8 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override
public final Object getDuplicateProperty() {
if (applicationProperties == null) {
if (!AMQPMessageSymbolSearch.anyApplicationProperties(getData(), DUPLICATE_ID_NEEDLES)) {
if (applicationProperties == null && messageDataScanned == MessageDataScanningStatus.SCANNED.code && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
if (!AMQPMessageSymbolSearch.anyApplicationProperties(getData(), DUPLICATE_ID_NEEDLES, applicationPropertiesPosition)) {
// no need for duplicate-property
return null;
}

View File

@ -58,17 +58,17 @@ final class AMQPMessageSymbolSearch {
public static boolean anyMessageAnnotations(final ReadableBuffer data, final KMPNeedle[] needles) {
return lookupOnSection(MessageAnnotations.class, data, needles);
return lookupOnSection(MessageAnnotations.class, data, needles, 0);
}
public static boolean anyApplicationProperties(final ReadableBuffer data, final KMPNeedle[] needles) {
return lookupOnSection(ApplicationProperties.class, data, needles);
public static boolean anyApplicationProperties(final ReadableBuffer data, final KMPNeedle[] needles, int startAt) {
return lookupOnSection(ApplicationProperties.class, data, needles, startAt);
}
private static boolean lookupOnSection(final Class section, final ReadableBuffer data, final KMPNeedle[] needles) {
private static boolean lookupOnSection(final Class section, final ReadableBuffer data, final KMPNeedle[] needles, final int startAt) {
DecoderImpl decoder = TLSEncode.getDecoder();
final int position = data.position();
decoder.setBuffer(data.rewind());
decoder.setBuffer(data.position(startAt));
try {
while (data.hasRemaining()) {
TypeConstructor<?> constructor = decoder.readConstructor();

View File

@ -725,6 +725,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
message.getWrappedMessage().setContentType("text/plain");
message.getWrappedMessage().setBody(new Data(new Binary(messageText.getBytes(StandardCharsets.UTF_8))));
//message.setApplicationProperty("_AMQ_DUPL_ID", "11");
sender.send(message);
sender.close();