AMQ-7001 Resolve issues with encode failures on copied messages

Ensure that messages are copied to avoid contention on message content
when concurrent store and dispatch is enabled and better handle the AMQP
message type value.  Adds an AMQP Encoder for UTF8Buffer to encode down
to AMQP String encodings to allow for encoded OpenWire messages such as
MapMessage which can contain UTF8Buffer instances for String keys and
values.
This commit is contained in:
Timothy Bish 2018-08-01 18:13:37 -04:00
parent d1c3c4814e
commit 9ec6ee43b1
6 changed files with 416 additions and 18 deletions

View File

@ -41,12 +41,16 @@ public class AMQPNativeOutboundTransformer implements OutboundTransformer {
} }
static EncodedMessage transform(OutboundTransformer options, ActiveMQBytesMessage message) throws JMSException { static EncodedMessage transform(OutboundTransformer options, ActiveMQBytesMessage message) throws JMSException {
long messageFormat; final long messageFormat;
if (message.propertyExists(JMS_AMQP_MESSAGE_FORMAT)) {
try { try {
messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT); messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
} catch (MessageFormatException e) { } catch (MessageFormatException e) {
return null; return null;
} }
} else {
messageFormat = 0;
}
Binary encodedMessage = getBinaryFromMessageBody(message); Binary encodedMessage = getBinaryFromMessageBody(message);
byte encodedData[] = encodedMessage.getArray(); byte encodedData[] = encodedMessage.getArray();

View File

@ -102,11 +102,17 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
public static final byte TEMP_QUEUE_TYPE = 0x02; public static final byte TEMP_QUEUE_TYPE = 0x02;
public static final byte TEMP_TOPIC_TYPE = 0x03; public static final byte TEMP_TOPIC_TYPE = 0x03;
private final UTF8BufferType utf8BufferEncoding;
// For now Proton requires that we create a decoder to create an encoder // For now Proton requires that we create a decoder to create an encoder
private final DecoderImpl decoder = new DecoderImpl(); private final DecoderImpl decoder = new DecoderImpl();
private final EncoderImpl encoder = new EncoderImpl(decoder); private final EncoderImpl encoder = new EncoderImpl(decoder);
{ {
AMQPDefinedTypes.registerAllTypes(decoder, encoder); AMQPDefinedTypes.registerAllTypes(decoder, encoder);
utf8BufferEncoding = new UTF8BufferType(encoder, decoder);
encoder.register(utf8BufferEncoding);
} }
@Override @Override
@ -159,7 +165,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
} }
properties.setTo(destination.getQualifiedName()); properties.setTo(destination.getQualifiedName());
if (maMap == null) { if (maMap == null) {
maMap = new HashMap<Symbol, Object>(); maMap = new HashMap<>();
} }
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
} }
@ -170,7 +176,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
} }
properties.setReplyTo(replyTo.getQualifiedName()); properties.setReplyTo(replyTo.getQualifiedName());
if (maMap == null) { if (maMap == null) {
maMap = new HashMap<Symbol, Object>(); maMap = new HashMap<>();
} }
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
} }
@ -276,7 +282,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
continue; continue;
} else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (maMap == null) { if (maMap == null) {
maMap = new HashMap<Symbol, Object>(); maMap = new HashMap<>();
} }
String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), value); maMap.put(Symbol.valueOf(name), value);
@ -307,14 +313,14 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
continue; continue;
} else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (daMap == null) { if (daMap == null) {
daMap = new HashMap<Symbol, Object>(); daMap = new HashMap<>();
} }
String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), value); daMap.put(Symbol.valueOf(name), value);
continue; continue;
} else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (footerMap == null) { if (footerMap == null) {
footerMap = new HashMap<Object, Object>(); footerMap = new HashMap<>();
} }
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
footerMap.put(name, value); footerMap.put(name, value);
@ -328,7 +334,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
// The property didn't map into any other slot so we store it in the // The property didn't map into any other slot so we store it in the
// Application Properties section of the message. // Application Properties section of the message.
if (apMap == null) { if (apMap == null) {
apMap = new HashMap<String, Object>(); apMap = new HashMap<>();
} }
apMap.put(key, value); apMap.put(key, value);
} }
@ -409,7 +415,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
} else if (messageType == CommandTypes.ACTIVEMQ_MAP_MESSAGE) { } else if (messageType == CommandTypes.ACTIVEMQ_MAP_MESSAGE) {
body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message)); body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message));
} else if (messageType == CommandTypes.ACTIVEMQ_STREAM_MESSAGE) { } else if (messageType == CommandTypes.ACTIVEMQ_STREAM_MESSAGE) {
ArrayList<Object> list = new ArrayList<Object>(); ArrayList<Object> list = new ArrayList<>();
final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message; final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message;
try { try {
while (true) { while (true) {

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import java.util.Arrays;
import java.util.Collection;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.PrimitiveType;
import org.apache.qpid.proton.codec.PrimitiveTypeEncoding;
import org.apache.qpid.proton.codec.TypeEncoding;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.fusesource.hawtbuf.UTF8Buffer;
/**
* AMQP Type used to allow to proton-j codec to deal with UTF8Buffer types as if
* they were String elements.
*/
public class UTF8BufferType implements PrimitiveType<UTF8Buffer> {
private final UTF8BufferEncoding largeBufferEncoding;
private final UTF8BufferEncoding smallBufferEncoding;
public UTF8BufferType(EncoderImpl encoder, DecoderImpl decoder) {
this.largeBufferEncoding = new LargeUTF8BufferEncoding(encoder, decoder);
this.smallBufferEncoding = new SmallUTF8BufferEncoding(encoder, decoder);
}
@Override
public Class<UTF8Buffer> getTypeClass() {
return UTF8Buffer.class;
}
@Override
public PrimitiveTypeEncoding<UTF8Buffer> getEncoding(UTF8Buffer value) {
return value.getLength() <= 255 ? smallBufferEncoding : largeBufferEncoding;
}
@Override
public PrimitiveTypeEncoding<UTF8Buffer> getCanonicalEncoding() {
return largeBufferEncoding;
}
@Override
public Collection<? extends PrimitiveTypeEncoding<UTF8Buffer>> getAllEncodings() {
return Arrays.asList(smallBufferEncoding, largeBufferEncoding);
}
@Override
public void write(UTF8Buffer value) {
final TypeEncoding<UTF8Buffer> encoding = getEncoding(value);
encoding.writeConstructor();
encoding.writeValue(value);
}
public abstract class UTF8BufferEncoding implements PrimitiveTypeEncoding<UTF8Buffer> {
private final EncoderImpl encoder;
private final DecoderImpl decoder;
public UTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
this.encoder = encoder;
this.decoder = decoder;
}
@Override
public int getConstructorSize() {
return 1;
}
@Override
public boolean isFixedSizeVal() {
return false;
}
@Override
public boolean encodesJavaPrimitive() {
return false;
}
/**
* @return the number of bytes the size portion of the encoded value requires.
*/
public abstract int getSizeBytes();
@Override
public void writeConstructor() {
getEncoder().writeRaw(getEncodingCode());
}
@Override
public void writeValue(UTF8Buffer value) {
writeSize(value);
WritableBuffer buffer = getEncoder().getBuffer();
buffer.put(value.getData(), value.getOffset(), value.getLength());
}
/**
* Write the size of the buffer using the appropriate type (byte or int) depending
* on the encoding type being used.
*
* @param value
* The UTF8Buffer value that is being encoded.
*/
public abstract void writeSize(UTF8Buffer value);
@Override
public int getValueSize(UTF8Buffer value) {
return getSizeBytes() + value.getLength();
}
@Override
public Class<UTF8Buffer> getTypeClass() {
return UTF8Buffer.class;
}
@Override
public PrimitiveType<UTF8Buffer> getType() {
return UTF8BufferType.this;
}
@Override
public boolean encodesSuperset(TypeEncoding<UTF8Buffer> encoding) {
return (getType() == encoding.getType());
}
@Override
public UTF8Buffer readValue() {
throw new UnsupportedOperationException("No decoding to UTF8Buffer exists");
}
@Override
public void skipValue() {
throw new UnsupportedOperationException("No decoding to UTF8Buffer exists");
}
public DecoderImpl getDecoder() {
return decoder;
}
public EncoderImpl getEncoder() {
return encoder;
}
}
public class LargeUTF8BufferEncoding extends UTF8BufferEncoding {
public LargeUTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
super(encoder, decoder);
}
@Override
public byte getEncodingCode() {
return EncodingCodes.STR32;
}
@Override
public int getSizeBytes() {
return Integer.BYTES;
}
@Override
public void writeSize(UTF8Buffer value) {
getEncoder().getBuffer().putInt(value.getLength());
}
}
public class SmallUTF8BufferEncoding extends UTF8BufferEncoding {
public SmallUTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
super(encoder, decoder);
}
@Override
public byte getEncodingCode() {
return EncodingCodes.STR8;
}
@Override
public int getSizeBytes() {
return Byte.BYTES;
}
@Override
public void writeSize(UTF8Buffer value) {
getEncoder().getBuffer().put((byte) value.getLength());
}
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.transport.amqp.protocol; package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;
@ -449,11 +448,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
ActiveMQMessage temp = null; ActiveMQMessage temp = null;
if (md.getMessage() != null) { if (md.getMessage() != null) {
temp = (ActiveMQMessage) md.getMessage();
if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) {
temp = (ActiveMQMessage) md.getMessage().copy(); temp = (ActiveMQMessage) md.getMessage().copy();
temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0);
}
} }
final ActiveMQMessage jms = temp; final ActiveMQMessage jms = temp;

View File

@ -36,6 +36,7 @@ import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
@ -99,6 +100,33 @@ public class JMSClientTest extends JMSClientTestSupport {
} }
} }
@Test(timeout = 60000)
public void testSendJMSMapMessage() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
MapMessage message = session.createMapMessage();
message.setBoolean("Boolean", false);
message.setString("STRING", "TEST");
producer.send(message);
QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
assertEquals(1, proxy.getQueueSize());
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
assertNotNull(received);
assertTrue(received instanceof MapMessage);
MapMessage map = (MapMessage) received;
assertEquals("TEST", map.getString("STRING"));
assertEquals(false, map.getBooleanProperty("Boolean"));
}
}
@Test(timeout=30000) @Test(timeout=30000)
public void testAnonymousProducerConsume() throws Exception { public void testAnonymousProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing(); ActiveMQAdmin.enableJMSFrameTracing();

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.PrimitiveTypeEncoding;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.junit.Before;
import org.junit.Test;
/**
* Test the UTF8Buffer type encoder
*/
public class UTF8BufferTypeTest {
private final UTF8BufferType utf8BufferEncoding;
private final DecoderImpl decoder = new DecoderImpl();
private final EncoderImpl encoder = new EncoderImpl(decoder);
{
AMQPDefinedTypes.registerAllTypes(decoder, encoder);
utf8BufferEncoding = new UTF8BufferType(encoder, decoder);
encoder.register(utf8BufferEncoding);
}
private String smallString = UUID.randomUUID().toString();
private String largeString = UUID.randomUUID().toString() + UUID.randomUUID().toString() +
UUID.randomUUID().toString() + UUID.randomUUID().toString() +
UUID.randomUUID().toString() + UUID.randomUUID().toString() +
UUID.randomUUID().toString() + UUID.randomUUID().toString();
private UTF8Buffer smallBuffer;
private UTF8Buffer largeBuffer;
@Before
public void setUp() {
smallBuffer = new UTF8Buffer(smallString.getBytes(StandardCharsets.UTF_8));
largeBuffer = new UTF8Buffer(largeString.getBytes(StandardCharsets.UTF_8));
}
@Test
public void testGetAllEncodings() {
assertEquals(2, utf8BufferEncoding.getAllEncodings().size());
}
@Test
public void testGetTypeClass() {
assertEquals(UTF8Buffer.class, utf8BufferEncoding.getTypeClass());
}
@Test
public void testGetCanonicalEncoding() {
assertNotNull(utf8BufferEncoding.getCanonicalEncoding());
}
@Test
public void testGetEncodingForSmallUTF8Buffer() {
PrimitiveTypeEncoding<UTF8Buffer> encoding = utf8BufferEncoding.getEncoding(smallBuffer);
assertTrue(encoding instanceof UTF8BufferType.SmallUTF8BufferEncoding);
assertEquals(1, encoding.getConstructorSize());
assertEquals(smallBuffer.getLength() + Byte.BYTES, encoding.getValueSize(smallBuffer));
assertEquals(EncodingCodes.STR8, encoding.getEncodingCode());
assertFalse(encoding.encodesJavaPrimitive());
assertEquals(utf8BufferEncoding, encoding.getType());
}
@Test
public void testGetEncodingForLargeUTF8Buffer() {
PrimitiveTypeEncoding<UTF8Buffer> encoding = utf8BufferEncoding.getEncoding(largeBuffer);
assertTrue(encoding instanceof UTF8BufferType.LargeUTF8BufferEncoding);
assertEquals(1, encoding.getConstructorSize());
assertEquals(largeBuffer.getLength() + Integer.BYTES, encoding.getValueSize(largeBuffer));
assertEquals(EncodingCodes.STR32, encoding.getEncodingCode());
assertFalse(encoding.encodesJavaPrimitive());
assertEquals(utf8BufferEncoding, encoding.getType());
}
@Test
public void testEncodeDecodeEmptyStringBuffer() {
final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
encoder.setByteBuffer(buffer);
encoder.writeObject(new UTF8Buffer(""));
byte[] copy = new byte[buffer.getArrayLength()];
System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength());
ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy);
decoder.setBuffer(encoded);
Object valueRead = decoder.readObject();
assertTrue(valueRead instanceof String);
String decodedString = (String) valueRead;
assertEquals("", decodedString);
}
@Test
public void testEncodeDecodeSmallBuffer() {
final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
encoder.setByteBuffer(buffer);
encoder.writeObject(smallBuffer);
byte[] copy = new byte[buffer.getArrayLength()];
System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength());
ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy);
decoder.setBuffer(encoded);
Object valueRead = decoder.readObject();
assertTrue(valueRead instanceof String);
String decodedString = (String) valueRead;
assertEquals(smallString, decodedString);
}
@Test
public void testEncodeDecodeLargeBuffer() {
final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
encoder.setByteBuffer(buffer);
encoder.writeObject(largeBuffer);
byte[] copy = new byte[buffer.getArrayLength()];
System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength());
ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy);
decoder.setBuffer(encoded);
Object valueRead = decoder.readObject();
assertTrue(valueRead instanceof String);
String decodedString = (String) valueRead;
assertEquals(largeString, decodedString);
}
}