diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java index cbc34619b6..37053a8e63 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java @@ -41,11 +41,15 @@ public class AMQPNativeOutboundTransformer implements OutboundTransformer { } static EncodedMessage transform(OutboundTransformer options, ActiveMQBytesMessage message) throws JMSException { - long messageFormat; - try { - messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT); - } catch (MessageFormatException e) { - return null; + final long messageFormat; + if (message.propertyExists(JMS_AMQP_MESSAGE_FORMAT)) { + try { + messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT); + } catch (MessageFormatException e) { + return null; + } + } else { + messageFormat = 0; } Binary encodedMessage = getBinaryFromMessageBody(message); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 5b220994fa..f3db5e4eca 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -102,11 +102,17 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { public static final byte TEMP_QUEUE_TYPE = 0x02; public static final byte TEMP_TOPIC_TYPE = 0x03; + private final UTF8BufferType utf8BufferEncoding; + // For now Proton requires that we create a decoder to create an encoder private final DecoderImpl decoder = new DecoderImpl(); private final EncoderImpl encoder = new EncoderImpl(decoder); { AMQPDefinedTypes.registerAllTypes(decoder, encoder); + + utf8BufferEncoding = new UTF8BufferType(encoder, decoder); + + encoder.register(utf8BufferEncoding); } @Override @@ -159,7 +165,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { } properties.setTo(destination.getQualifiedName()); if (maMap == null) { - maMap = new HashMap(); + maMap = new HashMap<>(); } maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); } @@ -170,7 +176,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { } properties.setReplyTo(replyTo.getQualifiedName()); if (maMap == null) { - maMap = new HashMap(); + maMap = new HashMap<>(); } maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); } @@ -276,7 +282,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { continue; } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { if (maMap == null) { - maMap = new HashMap(); + maMap = new HashMap<>(); } String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); maMap.put(Symbol.valueOf(name), value); @@ -307,14 +313,14 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { continue; } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { if (daMap == null) { - daMap = new HashMap(); + daMap = new HashMap<>(); } String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); daMap.put(Symbol.valueOf(name), value); continue; } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { if (footerMap == null) { - footerMap = new HashMap(); + footerMap = new HashMap<>(); } String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); footerMap.put(name, value); @@ -328,7 +334,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { // The property didn't map into any other slot so we store it in the // Application Properties section of the message. if (apMap == null) { - apMap = new HashMap(); + apMap = new HashMap<>(); } apMap.put(key, value); } @@ -409,7 +415,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { } else if (messageType == CommandTypes.ACTIVEMQ_MAP_MESSAGE) { body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message)); } else if (messageType == CommandTypes.ACTIVEMQ_STREAM_MESSAGE) { - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList<>(); final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message; try { while (true) { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/UTF8BufferType.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/UTF8BufferType.java new file mode 100644 index 0000000000..387032ec8e --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/UTF8BufferType.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.EncodingCodes; +import org.apache.qpid.proton.codec.PrimitiveType; +import org.apache.qpid.proton.codec.PrimitiveTypeEncoding; +import org.apache.qpid.proton.codec.TypeEncoding; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.fusesource.hawtbuf.UTF8Buffer; + +/** + * AMQP Type used to allow to proton-j codec to deal with UTF8Buffer types as if + * they were String elements. + */ +public class UTF8BufferType implements PrimitiveType { + + private final UTF8BufferEncoding largeBufferEncoding; + private final UTF8BufferEncoding smallBufferEncoding; + + public UTF8BufferType(EncoderImpl encoder, DecoderImpl decoder) { + this.largeBufferEncoding = new LargeUTF8BufferEncoding(encoder, decoder); + this.smallBufferEncoding = new SmallUTF8BufferEncoding(encoder, decoder); + } + + @Override + public Class getTypeClass() { + return UTF8Buffer.class; + } + + @Override + public PrimitiveTypeEncoding getEncoding(UTF8Buffer value) { + return value.getLength() <= 255 ? smallBufferEncoding : largeBufferEncoding; + } + + @Override + public PrimitiveTypeEncoding getCanonicalEncoding() { + return largeBufferEncoding; + } + + @Override + public Collection> getAllEncodings() { + return Arrays.asList(smallBufferEncoding, largeBufferEncoding); + } + + @Override + public void write(UTF8Buffer value) { + final TypeEncoding encoding = getEncoding(value); + encoding.writeConstructor(); + encoding.writeValue(value); + } + + public abstract class UTF8BufferEncoding implements PrimitiveTypeEncoding { + + private final EncoderImpl encoder; + private final DecoderImpl decoder; + + public UTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) { + this.encoder = encoder; + this.decoder = decoder; + } + + @Override + public int getConstructorSize() { + return 1; + } + + @Override + public boolean isFixedSizeVal() { + return false; + } + + @Override + public boolean encodesJavaPrimitive() { + return false; + } + + /** + * @return the number of bytes the size portion of the encoded value requires. + */ + public abstract int getSizeBytes(); + + @Override + public void writeConstructor() { + getEncoder().writeRaw(getEncodingCode()); + } + + @Override + public void writeValue(UTF8Buffer value) { + writeSize(value); + WritableBuffer buffer = getEncoder().getBuffer(); + buffer.put(value.getData(), value.getOffset(), value.getLength()); + } + + /** + * Write the size of the buffer using the appropriate type (byte or int) depending + * on the encoding type being used. + * + * @param value + * The UTF8Buffer value that is being encoded. + */ + public abstract void writeSize(UTF8Buffer value); + + @Override + public int getValueSize(UTF8Buffer value) { + return getSizeBytes() + value.getLength(); + } + + @Override + public Class getTypeClass() { + return UTF8Buffer.class; + } + + @Override + public PrimitiveType getType() { + return UTF8BufferType.this; + } + + @Override + public boolean encodesSuperset(TypeEncoding encoding) { + return (getType() == encoding.getType()); + } + + @Override + public UTF8Buffer readValue() { + throw new UnsupportedOperationException("No decoding to UTF8Buffer exists"); + } + + @Override + public void skipValue() { + throw new UnsupportedOperationException("No decoding to UTF8Buffer exists"); + } + + public DecoderImpl getDecoder() { + return decoder; + } + + public EncoderImpl getEncoder() { + return encoder; + } + } + + public class LargeUTF8BufferEncoding extends UTF8BufferEncoding { + + public LargeUTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) { + super(encoder, decoder); + } + + @Override + public byte getEncodingCode() { + return EncodingCodes.STR32; + } + + @Override + public int getSizeBytes() { + return Integer.BYTES; + } + + @Override + public void writeSize(UTF8Buffer value) { + getEncoder().getBuffer().putInt(value.getLength()); + } + } + + public class SmallUTF8BufferEncoding extends UTF8BufferEncoding { + + public SmallUTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) { + super(encoder, decoder); + } + + @Override + public byte getEncodingCode() { + return EncodingCodes.STR8; + } + + @Override + public int getSizeBytes() { + return Byte.BYTES; + } + + @Override + public void writeSize(UTF8Buffer value) { + getEncoder().getBuffer().put((byte) value.getLength()); + } + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 17185a0d79..9b75c1a4c8 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -17,7 +17,6 @@ package org.apache.activemq.transport.amqp.protocol; import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; -import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT; import java.io.IOException; import java.util.LinkedList; @@ -449,11 +448,7 @@ public class AmqpSender extends AmqpAbstractLink { ActiveMQMessage temp = null; if (md.getMessage() != null) { - temp = (ActiveMQMessage) md.getMessage(); - if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) { - temp = (ActiveMQMessage) md.getMessage().copy(); - temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0); - } + temp = (ActiveMQMessage) md.getMessage().copy(); } final ActiveMQMessage jms = temp; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 97ce106b04..c240398bc4 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -36,6 +36,7 @@ import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -99,6 +100,33 @@ public class JMSClientTest extends JMSClientTestSupport { } } + @Test(timeout = 60000) + public void testSendJMSMapMessage() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + + connection = createConnection(); + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + MapMessage message = session.createMapMessage(); + message.setBoolean("Boolean", false); + message.setString("STRING", "TEST"); + producer.send(message); + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + Message received = consumer.receive(5000); + assertNotNull(received); + assertTrue(received instanceof MapMessage); + MapMessage map = (MapMessage) received; + assertEquals("TEST", map.getString("STRING")); + assertEquals(false, map.getBooleanProperty("Boolean")); + } + } + @Test(timeout=30000) public void testAnonymousProducerConsume() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/UTF8BufferTypeTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/UTF8BufferTypeTest.java new file mode 100644 index 0000000000..c9bc52c2a4 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/UTF8BufferTypeTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import org.apache.qpid.proton.codec.AMQPDefinedTypes; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.EncodingCodes; +import org.apache.qpid.proton.codec.PrimitiveTypeEncoding; +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the UTF8Buffer type encoder + */ +public class UTF8BufferTypeTest { + + private final UTF8BufferType utf8BufferEncoding; + private final DecoderImpl decoder = new DecoderImpl(); + private final EncoderImpl encoder = new EncoderImpl(decoder); + { + AMQPDefinedTypes.registerAllTypes(decoder, encoder); + + utf8BufferEncoding = new UTF8BufferType(encoder, decoder); + + encoder.register(utf8BufferEncoding); + } + + private String smallString = UUID.randomUUID().toString(); + private String largeString = UUID.randomUUID().toString() + UUID.randomUUID().toString() + + UUID.randomUUID().toString() + UUID.randomUUID().toString() + + UUID.randomUUID().toString() + UUID.randomUUID().toString() + + UUID.randomUUID().toString() + UUID.randomUUID().toString(); + + private UTF8Buffer smallBuffer; + private UTF8Buffer largeBuffer; + + @Before + public void setUp() { + smallBuffer = new UTF8Buffer(smallString.getBytes(StandardCharsets.UTF_8)); + largeBuffer = new UTF8Buffer(largeString.getBytes(StandardCharsets.UTF_8)); + } + + @Test + public void testGetAllEncodings() { + assertEquals(2, utf8BufferEncoding.getAllEncodings().size()); + } + + @Test + public void testGetTypeClass() { + assertEquals(UTF8Buffer.class, utf8BufferEncoding.getTypeClass()); + } + + @Test + public void testGetCanonicalEncoding() { + assertNotNull(utf8BufferEncoding.getCanonicalEncoding()); + } + + @Test + public void testGetEncodingForSmallUTF8Buffer() { + PrimitiveTypeEncoding encoding = utf8BufferEncoding.getEncoding(smallBuffer); + + assertTrue(encoding instanceof UTF8BufferType.SmallUTF8BufferEncoding); + assertEquals(1, encoding.getConstructorSize()); + assertEquals(smallBuffer.getLength() + Byte.BYTES, encoding.getValueSize(smallBuffer)); + assertEquals(EncodingCodes.STR8, encoding.getEncodingCode()); + assertFalse(encoding.encodesJavaPrimitive()); + assertEquals(utf8BufferEncoding, encoding.getType()); + } + + @Test + public void testGetEncodingForLargeUTF8Buffer() { + PrimitiveTypeEncoding encoding = utf8BufferEncoding.getEncoding(largeBuffer); + + assertTrue(encoding instanceof UTF8BufferType.LargeUTF8BufferEncoding); + assertEquals(1, encoding.getConstructorSize()); + assertEquals(largeBuffer.getLength() + Integer.BYTES, encoding.getValueSize(largeBuffer)); + assertEquals(EncodingCodes.STR32, encoding.getEncodingCode()); + assertFalse(encoding.encodesJavaPrimitive()); + assertEquals(utf8BufferEncoding, encoding.getType()); + } + + @Test + public void testEncodeDecodeEmptyStringBuffer() { + final AmqpWritableBuffer buffer = new AmqpWritableBuffer(); + encoder.setByteBuffer(buffer); + encoder.writeObject(new UTF8Buffer("")); + + byte[] copy = new byte[buffer.getArrayLength()]; + System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength()); + + ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy); + decoder.setBuffer(encoded); + + Object valueRead = decoder.readObject(); + assertTrue(valueRead instanceof String); + String decodedString = (String) valueRead; + assertEquals("", decodedString); + } + + @Test + public void testEncodeDecodeSmallBuffer() { + final AmqpWritableBuffer buffer = new AmqpWritableBuffer(); + encoder.setByteBuffer(buffer); + encoder.writeObject(smallBuffer); + + byte[] copy = new byte[buffer.getArrayLength()]; + System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength()); + + ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy); + decoder.setBuffer(encoded); + + Object valueRead = decoder.readObject(); + assertTrue(valueRead instanceof String); + String decodedString = (String) valueRead; + assertEquals(smallString, decodedString); + } + + @Test + public void testEncodeDecodeLargeBuffer() { + final AmqpWritableBuffer buffer = new AmqpWritableBuffer(); + encoder.setByteBuffer(buffer); + encoder.writeObject(largeBuffer); + + byte[] copy = new byte[buffer.getArrayLength()]; + System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength()); + + ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy); + decoder.setBuffer(encoded); + + Object valueRead = decoder.readObject(); + assertTrue(valueRead instanceof String); + String decodedString = (String) valueRead; + assertEquals(largeString, decodedString); + } +}