ARTEMIS-2454 Message Body damaged after re-encoding

This commit is contained in:
Clebert Suconic 2019-08-18 14:11:06 -04:00
parent 349fea888d
commit 5f75f68129
4 changed files with 125 additions and 3 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.utils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import java.util.Arrays;
@ -62,7 +64,7 @@ public class ByteUtil {
StringBuffer buffer = new StringBuffer();
int line = 1;
buffer.append("/* 1 */ \"");
buffer.append("/* 0 */ \"");
for (int i = 0; i < str.length(); i += groupSize) {
buffer.append(str.substring(i, i + Math.min(str.length() - i, groupSize)));
@ -72,7 +74,7 @@ public class ByteUtil {
if (line < 10) {
buffer.append(" ");
}
buffer.append(Integer.toString(line) + " */ \"");
buffer.append(Integer.toString(i) + " */ \"");
} else if ((i + groupSize) % groupSize == 0 && str.length() - i > groupSize) {
buffer.append("\" + \"");
}
@ -102,6 +104,30 @@ public class ByteUtil {
return new String(hexChars);
}
/** Simplify reading of a byte array in a programmers understable way */
public static String debugByteArray(byte[] byteArray) {
StringWriter builder = new StringWriter();
PrintWriter writer = new PrintWriter(builder);
for (int i = 0; i < byteArray.length; i++) {
writer.print("\t[" + i + "]=" + ByteUtil.byteToChar(byteArray[i]) + " / " + byteArray[i]);
if (i > 0 && i % 8 == 0) {
writer.println();
} else {
writer.print(" ");
}
}
return builder.toString();
}
public static String byteToChar(byte value) {
char[] hexChars = new char[2];
int v = value & 0xFF;
hexChars[0] = hexArray[v >>> 4];
hexChars[1] = hexArray[v & 0x0F];
return new String(hexChars);
}
public static String bytesToHex(byte[] bytes, int groupSize) {
if (bytes == null) {
return "NULL";

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
@ -295,6 +296,12 @@ public class AMQPMessage extends RefCountMessage {
return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
}
/** This is different from toString, as this will print an expanded version of the buffer
* in Hex and programmers's readable format */
public String toDebugString() {
return ByteUtil.debugByteArray(data.array());
}
/**
* Retrieves the AMQP Section that composes the body of this message by decoding a
* fresh copy from the encoded message data. Changes to the returned value are not

View File

@ -151,7 +151,7 @@ public class NettyReadable implements ReadableBuffer {
@Override
public int arrayOffset() {
return buffer.arrayOffset() + buffer.readerIndex();
return buffer.arrayOffset();
}
@Override

View File

@ -16,11 +16,13 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@ -37,8 +39,10 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
@ -47,6 +51,8 @@ import io.netty.buffer.Unpooled;
public class TestConversions extends Assert {
private static final Logger logger = Logger.getLogger(TestConversions.class);
@Test
public void testAmqpValueOfBooleanIsPassedThrough() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
@ -226,6 +232,89 @@ public class TestConversions extends Assert {
assertEquals(text, textMessage.getText());
}
@Test
public void testEditAndConvert() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
properties.getValue().put("hello", "hello");
MessageImpl message = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
message.setMessageAnnotations(annotations);
message.setApplicationProperties(properties);
String text = "someText";
message.setBody(new AmqpValue(text));
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
TypedProperties extraProperties = new TypedProperties();
encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
for (int i = 0; i < 10; i++) {
if (logger.isDebugEnabled()) {
logger.debug("Message encoded :: " + encodedMessage.toDebugString());
}
encodedMessage.messageChanged();
AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
Assert.assertEquals(text, (String)value.getValue());
// this line is needed to force a failure
ICoreMessage coreMessage = encodedMessage.toCore();
if (logger.isDebugEnabled()) {
logger.debug("Converted message: " + coreMessage);
}
}
}
@Test
public void testExpandPropertiesAndConvert() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
properties.getValue().put("hello", "hello");
MessageImpl message = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
message.setMessageAnnotations(annotations);
message.setApplicationProperties(properties);
String text = "someText";
message.setBody(new AmqpValue(text));
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
TypedProperties extraProperties = new TypedProperties();
encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
for (int i = 0; i < 100; i++) {
encodedMessage.getApplicationProperties().getValue().put("another" + i, "value" + i);
encodedMessage.messageChanged();
encodedMessage.reencode();
AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
Assert.assertEquals(text, (String)value.getValue());
ICoreMessage coreMessage = encodedMessage.toCore();
if (logger.isDebugEnabled()) {
logger.debug("Converted message: " + coreMessage);
}
// I'm going to replace the message every 10 messages by a re-encoded version to check if the wiring still acturate.
// I want to mix replacing and not replacing to make sure the re-encoding is not giving me any surprises
if (i > 0 && i % 10 == 0) {
ByteBuf buf = Unpooled.buffer(15 * 1024, 150 * 1024);
encodedMessage.sendBuffer(buf, 1);
byte[] messageBytes = new byte[buf.writerIndex()];
buf.readBytes(messageBytes);
message = (MessageImpl) Message.Factory.create();
message.decode(ByteBuffer.wrap(messageBytes));
// This is replacing the message by the new expanded version
encodedMessage = encodeAndCreateAMQPMessage(message);
}
}
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);